From 6e6c2cc42ea837c0f677d40a478070fc13235830 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 1 Aug 2025 20:43:02 +0200 Subject: [PATCH] Websocket fix 3 --- services/training/app/api/websocket.py | 3 +- tests/package-lock.json | 33 ++ tests/package.json | 5 + tests/test_onboarding_flow.sh | 760 +++++-------------------- 4 files changed, 180 insertions(+), 621 deletions(-) create mode 100644 tests/package-lock.json create mode 100644 tests/package.json diff --git a/services/training/app/api/websocket.py b/services/training/app/api/websocket.py index ad72c58b..dcf7ec4a 100644 --- a/services/training/app/api/websocket.py +++ b/services/training/app/api/websocket.py @@ -8,6 +8,7 @@ import asyncio from typing import Dict, Any from fastapi import WebSocket, WebSocketDisconnect, Depends, HTTPException from fastapi.routing import APIRouter +import datetime import structlog logger = structlog.get_logger(__name__) @@ -157,7 +158,7 @@ async def training_progress_websocket( await websocket.send_json({ "type": "heartbeat", "job_id": job_id, - "timestamp": datetime.utcnow().isoformat() + "timestamp": str(datetime.datetime.now()) }) except Exception as e: logger.error(f"Failed to send heartbeat for job {job_id}: {e}") diff --git a/tests/package-lock.json b/tests/package-lock.json new file mode 100644 index 00000000..5562ac37 --- /dev/null +++ b/tests/package-lock.json @@ -0,0 +1,33 @@ +{ + "name": "tests", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "dependencies": { + "ws": "^8.18.3" + } + }, + "node_modules/ws": { + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/tests/package.json b/tests/package.json new file mode 100644 index 00000000..31fef034 --- /dev/null +++ b/tests/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "ws": "^8.18.3" + } +} diff --git a/tests/test_onboarding_flow.sh b/tests/test_onboarding_flow.sh index 4c61f6c4..62e1b11b 100755 --- a/tests/test_onboarding_flow.sh +++ b/tests/test_onboarding_flow.sh @@ -16,7 +16,6 @@ WS_BASE="ws://localhost:8002/api/v1/ws" WS_TEST_DURATION=2000 # seconds to listen for WebSocket messages WS_PID="" - # Colors for output RED='\033[0;31m' GREEN='\033[0;32m' @@ -158,441 +157,161 @@ check_timezone_error() { return 1 # No timezone error } +# ---------------------------------------------------------------- +# IMPROVED WEBSOCKET MONITORING FUNCTION (CORRECTED) +# ---------------------------------------------------------------- +# This function is now more robust, handling its own dependencies. test_websocket_with_nodejs_builtin() { local tenant_id="$1" local job_id="$2" - local max_duration="$3" # Maximum time to wait (fallback) + local max_duration="$3" # Maximum time to wait in seconds - echo "Using Node.js with built-in modules for WebSocket testing..." - echo "Will monitor until job completion or ${max_duration}s timeout" + log_step "4.2.1. Starting robust WebSocket monitoring with Node.js" + + # Check if node is installed + if ! command -v node >/dev/null 2>&1; then + log_error "Node.js is not installed. Cannot run WebSocket monitor." + echo "Please install Node.js to use this feature." + return 1 + fi - # Create ENHANCED Node.js WebSocket test script - local ws_test_script="/tmp/websocket_test_$job_id.js" - cat > "$ws_test_script" << 'EOF' -// ENHANCED WebSocket test - waits for job completion -const https = require('https'); -const http = require('http'); -const crypto = require('crypto'); + # Check if npm is installed + if ! command -v npm >/dev/null 2>&1; then + log_error "npm is not installed. Cannot install Node.js dependencies." + echo "Please ensure npm is installed with Node.js." + return 1 + fi -const tenantId = process.argv[2]; -const jobId = process.argv[3]; -const maxDuration = parseInt(process.argv[4]) * 1000; // Convert to milliseconds -const accessToken = process.argv[5]; -const wsUrl = process.argv[6]; + # Create a temporary directory for the script and its dependencies + local temp_dir=$(mktemp -d -t ws_monitor_XXXXXX) + local ws_monitor_script="$temp_dir/ws_monitor.js" + + echo "Created temp directory: $temp_dir" + + # Install the 'ws' module into the temporary directory + log_step "4.2.2. Installing 'ws' Node.js module in temporary directory..." + if ! (cd "$temp_dir" && npm install ws --silent >/dev/null); then + log_error "Failed to install 'ws' module. WebSocket monitoring will not run." + rm -rf "$temp_dir" + return 1 + fi + log_success "'ws' module installed successfully." + + # Write the Node.js WebSocket monitor script to the temporary directory + cat > "$ws_monitor_script" << 'EOF' +const WebSocket = require('ws'); -console.log(`๐Ÿš€ Starting enhanced WebSocket monitoring`); -console.log(`Connecting to: ${wsUrl}`); -console.log(`Will wait for job completion (max ${maxDuration/1000}s)`); +const wsUrl = process.argv[2]; +const accessToken = process.argv[3]; +const maxDuration = parseInt(process.argv[4]); // in seconds -// Parse WebSocket URL -const url = new URL(wsUrl); -const isSecure = url.protocol === 'wss:'; -const port = url.port || (isSecure ? 443 : 80); - -// Create WebSocket key -const key = crypto.randomBytes(16).toString('base64'); - -// WebSocket handshake headers -const headers = { - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, - 'Sec-WebSocket-Version': '13', +const ws = new WebSocket(wsUrl, { + headers: { 'Authorization': `Bearer ${accessToken}` + } +}); + +let timeout = setTimeout(() => { + console.error(`โŒ WebSocket timeout after ${maxDuration} seconds. No completion message received.`); + ws.close(); + process.exit(1); +}, maxDuration * 1000); + +let pingInterval = setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ping' })); + } +}, 30000); // Ping every 30 seconds to keep the connection alive + +ws.onopen = () => { + console.log('โœ… WebSocket connection established.'); }; -const options = { - hostname: url.hostname, - port: port, - path: url.pathname, - method: 'GET', - headers: headers -}; +ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data); + const messageType = message.type || 'unknown'; + const data = message.data || {}; + const timestamp = new Date().toLocaleTimeString(); -console.log(`Attempting WebSocket handshake to ${url.hostname}:${port}${url.pathname}`); + console.log(`[${timestamp}] ๐Ÿ“จ Message: ${messageType.toUpperCase()}`); -const client = isSecure ? https : http; -let messageCount = 0; -let jobCompleted = false; -let lastProgressUpdate = Date.now(); -let highestProgress = 0; + if (messageType === 'progress') { + const progress = data.progress; + const step = data.current_step; + const productsCompleted = data.products_completed; + const productsTotal = data.products_total; -// Enhanced job tracking -const jobStats = { - startTime: Date.now(), - progressUpdates: 0, - stepsCompleted: [], - productsProcessed: [], - errors: [] -}; - -const req = client.request(options); - -req.on('upgrade', (res, socket, head) => { - console.log('โœ… WebSocket handshake successful'); - console.log('๐Ÿ“ก Monitoring training progress...\n'); - - let buffer = Buffer.alloc(0); - - socket.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - - // WebSocket frame parsing - while (buffer.length >= 2) { - const firstByte = buffer[0]; - const secondByte = buffer[1]; - - const fin = (firstByte & 0x80) === 0x80; - const opcode = firstByte & 0x0F; - const masked = (secondByte & 0x80) === 0x80; - let payloadLength = secondByte & 0x7F; - - let offset = 2; - - // Handle extended payload length - if (payloadLength === 126) { - if (buffer.length < offset + 2) break; - payloadLength = buffer.readUInt16BE(offset); - offset += 2; - } else if (payloadLength === 127) { - if (buffer.length < offset + 8) break; - const high = buffer.readUInt32BE(offset); - const low = buffer.readUInt32BE(offset + 4); - if (high !== 0) { - console.log('โš ๏ธ Large payload detected, skipping...'); - buffer = buffer.slice(offset + 8); - continue; - } - payloadLength = low; - offset += 8; - } - - // Check if we have the complete frame - if (buffer.length < offset + payloadLength) { - break; // Wait for more data - } - - // Extract payload - const payload = buffer.slice(offset, offset + payloadLength); - buffer = buffer.slice(offset + payloadLength); - - // Handle different frame types - if (opcode === 1 && fin) { // Text frame - messageCount++; - lastProgressUpdate = Date.now(); - const timestamp = new Date().toLocaleTimeString(); - - try { - const messageText = payload.toString('utf8'); - const message = JSON.parse(messageText); - - // Enhanced message processing - processTrainingMessage(message, timestamp); - - } catch (e) { - const rawText = payload.toString('utf8'); - console.log(`[${timestamp}] โš ๏ธ Raw message: ${rawText.substring(0, 200)}${rawText.length > 200 ? '...' : ''}`); - } - - } else if (opcode === 8) { // Close frame - console.log('๐Ÿ”Œ WebSocket closed by server'); - socket.end(); - return; - - } else if (opcode === 9) { // Ping frame - // Send pong response - const pongFrame = Buffer.concat([ - Buffer.from([0x8A, payload.length]), - payload - ]); - socket.write(pongFrame); - - } else if (opcode === 10) { // Pong frame - // Ignore pong responses - continue; - } + console.log(` ๐Ÿ“Š Progress: ${progress}% - Step: ${step}`); + if (productsCompleted !== undefined && productsTotal !== undefined) { + console.log(` ๐Ÿ“ฆ Products: ${productsCompleted}/${productsTotal}`); } - }); - - function createTextFrame(text) { - const payload = Buffer.from(text, 'utf8'); - const payloadLength = payload.length; - - let frame; - if (payloadLength < 126) { - frame = Buffer.allocUnsafe(2 + payloadLength); - frame[0] = 0x81; // Text frame, FIN=1 - frame[1] = payloadLength; - payload.copy(frame, 2); - } else if (payloadLength < 65536) { - frame = Buffer.allocUnsafe(4 + payloadLength); - frame[0] = 0x81; - frame[1] = 126; - frame.writeUInt16BE(payloadLength, 2); - payload.copy(frame, 4); - } else { - throw new Error('Payload too large'); + } else if (messageType === 'completed') { + console.log('๐ŸŽ‰ TRAINING COMPLETED SUCCESSFULLY!'); + if (data.results) { + console.log(` โœ… Models Trained: ${data.results.successful_trainings}`); } - - return frame; - } - - // Enhanced message processing function - function processTrainingMessage(message, timestamp) { - const messageType = message.type || 'unknown'; - const data = message.data || {}; - - console.log(`[${timestamp}] ๐Ÿ“จ Message ${messageCount}: ${messageType.toUpperCase()}`); - - // Track job statistics - if (messageType === 'progress') { - jobStats.progressUpdates++; - const progress = data.progress || 0; - const step = data.current_step || 'Unknown step'; - const product = data.current_product; - - // Update highest progress - if (progress > highestProgress) { - highestProgress = progress; - } - - // Track steps - if (step && !jobStats.stepsCompleted.includes(step)) { - jobStats.stepsCompleted.push(step); - } - - // Track products - if (product && !jobStats.productsProcessed.includes(product)) { - jobStats.productsProcessed.push(product); - } - - // Display progress with enhanced formatting - console.log(` ๐Ÿ“Š Progress: ${progress}% (${step})`); - if (product) { - console.log(` ๐Ÿž Product: ${product}`); - } - if (data.products_completed && data.products_total) { - console.log(` ๐Ÿ“ฆ Products: ${data.products_completed}/${data.products_total} completed`); - } - if (data.estimated_time_remaining_minutes) { - console.log(` โฑ๏ธ ETA: ${data.estimated_time_remaining_minutes} minutes`); - } - - } else if (messageType === 'completed') { - jobCompleted = true; - const duration = Math.round((Date.now() - jobStats.startTime) / 1000); - - console.log(`\n๐ŸŽ‰ TRAINING COMPLETED SUCCESSFULLY!`); - console.log(` โฑ๏ธ Total Duration: ${duration}s`); - - if (data.results) { - const results = data.results; - if (results.successful_trainings !== undefined) { - console.log(` โœ… Models Trained: ${results.successful_trainings}`); - } - if (results.total_products !== undefined) { - console.log(` ๐Ÿ“ฆ Total Products: ${results.total_products}`); - } - if (results.success_rate !== undefined) { - console.log(` ๐Ÿ“ˆ Success Rate: ${results.success_rate}%`); - } - } - - // Close connection after completion - setTimeout(() => { - console.log('\n๐Ÿ“Š Training job completed - closing WebSocket connection'); - socket.end(); - }, 2000); // Wait 2 seconds to ensure all final messages are received - - } else if (messageType === 'failed') { - jobCompleted = true; - jobStats.errors.push(data); - - console.log(`\nโŒ TRAINING FAILED!`); - if (data.error) { - console.log(` ๐Ÿ’ฅ Error: ${data.error}`); - } - if (data.error_details) { - console.log(` ๐Ÿ“ Details: ${JSON.stringify(data.error_details, null, 2)}`); - } - - // Close connection after failure - setTimeout(() => { - console.log('\n๐Ÿ“Š Training job failed - closing WebSocket connection'); - socket.end(); - }, 2000); - - } else if (messageType === 'step_completed') { - console.log(` โœ… Step completed: ${data.step_name || 'Unknown'}`); - - } else if (messageType === 'product_started') { - console.log(` ๐Ÿš€ Started training: ${data.product_name || 'Unknown product'}`); - - } else if (messageType === 'product_completed') { - console.log(` โœ… Product completed: ${data.product_name || 'Unknown product'}`); - if (data.metrics) { - console.log(` ๐Ÿ“Š Metrics: ${JSON.stringify(data.metrics, null, 2)}`); - } - } - - console.log(''); // Add spacing between messages - } - - socket.on('end', () => { - const duration = Math.round((Date.now() - jobStats.startTime) / 1000); - - console.log(`\n๐Ÿ“Š WebSocket connection ended`); - console.log(`๐Ÿ“จ Total messages received: ${messageCount}`); - console.log(`โฑ๏ธ Connection duration: ${duration}s`); - console.log(`๐Ÿ“ˆ Highest progress reached: ${highestProgress}%`); - - if (jobCompleted) { - console.log('โœ… Job completed successfully - connection closed normally'); - process.exit(0); - } else { - console.log('โš ๏ธ Connection ended before job completion'); - console.log(`๐Ÿ“Š Progress reached: ${highestProgress}%`); - console.log(`๐Ÿ“‹ Steps completed: ${jobStats.stepsCompleted.length}`); - process.exit(1); - } - }); - - socket.on('error', (error) => { - console.log(`โŒ WebSocket error: ${error.message}`); - process.exit(1); - }); - - // Enhanced ping mechanism - send pings more frequently - const pingInterval = setInterval(() => { - if (socket.writable && !jobCompleted) { - try { - // Send JSON ping message instead of binary frame - const pingMessage = JSON.stringify({ type: 'ping' }); - const textFrame = createTextFrame(pingMessage); - socket.write(textFrame); - } catch (e) { - // Ignore ping errors - } - } - }, 5000); - - // Heartbeat check - ensure we're still receiving messages - const heartbeatInterval = setInterval(() => { - if (!jobCompleted) { - const timeSinceLastMessage = Date.now() - lastProgressUpdate; - - if (timeSinceLastMessage > 60000) { // 60 seconds without messages - console.log('\nโš ๏ธ No messages received for 60 seconds'); - console.log(' This could indicate the training is stuck or connection issues'); - console.log(` Last progress: ${highestProgress}%`); - } else if (timeSinceLastMessage > 30000) { // 30 seconds warning - console.log(`\n๐Ÿ’ค Quiet period: ${Math.round(timeSinceLastMessage/1000)}s since last update`); - console.log(' (This is normal during intensive training phases)'); - } - } - }, 15000); // Check every 15 seconds - - // Safety timeout - close connection if max duration exceeded - const safetyTimeout = setTimeout(() => { - if (!jobCompleted) { - clearInterval(pingInterval); - clearInterval(heartbeatInterval); - - console.log(`\nโฐ Maximum duration (${maxDuration/1000}s) reached`); - console.log(`๐Ÿ“Š Final status:`); - console.log(` ๐Ÿ“จ Messages received: ${messageCount}`); - console.log(` ๐Ÿ“ˆ Progress reached: ${highestProgress}%`); - console.log(` ๐Ÿ“‹ Steps completed: ${jobStats.stepsCompleted.length}`); - console.log(` ๐Ÿž Products processed: ${jobStats.productsProcessed.length}`); - - if (messageCount > 0) { - console.log('\nโœ… WebSocket communication was successful!'); - console.log(' Training may still be running - check server logs for completion'); - } else { - console.log('\nโš ๏ธ No messages received during monitoring period'); - } - - socket.end(); - } - }, maxDuration); - - // Clean up intervals when job completes - socket.on('end', () => { + clearTimeout(timeout); clearInterval(pingInterval); - clearInterval(heartbeatInterval); - clearTimeout(safetyTimeout); - }); -}); - -req.on('response', (res) => { - console.log(`โŒ HTTP response instead of WebSocket upgrade: ${res.statusCode}`); - console.log('Response headers:', res.headers); - - let body = ''; - res.on('data', chunk => body += chunk); - res.on('end', () => { - if (body) console.log('Response body:', body); + ws.close(); + process.exit(0); + } else if (messageType === 'failed') { + console.error('โŒ TRAINING FAILED!'); + if (data.error) { + console.error(' ๐Ÿ’ฅ Error:', data.error); + } + clearTimeout(timeout); + clearInterval(pingInterval); + ws.close(); process.exit(1); - }); -}); + } else if (messageType === 'heartbeat') { + // Heartbeat messages are handled, so we just log a debug message + console.log(' โค๏ธ Received heartbeat.'); + } else if (messageType === 'initial_status') { + console.log(' โ„น๏ธ Received initial status.'); + console.log(' Status:', data.status); + console.log(' Progress:', data.progress); + } -req.on('error', (error) => { - console.log(`โŒ Connection error: ${error.message}`); - process.exit(1); -}); + console.log(''); // Add a newline for readability between messages -req.end(); + } catch (e) { + console.error('โš ๏ธ Failed to parse message:', event.data, e); + } +}; + +ws.onclose = () => { + console.log('๐Ÿ”Œ WebSocket connection closed.'); +}; + +ws.onerror = (error) => { + console.error('๐Ÿ’ฅ WebSocket error:', error.message); + process.exit(1); +}; EOF - # Run the ENHANCED Node.js WebSocket test local ws_url="$WS_BASE/tenants/$tenant_id/training/jobs/$job_id/live" - echo "Starting enhanced WebSocket monitoring..." - node "$ws_test_script" "$tenant_id" "$job_id" "$max_duration" "$ACCESS_TOKEN" "$ws_url" + + echo "Connecting to WebSocket: $ws_url" + + # Run the monitor script from within the temporary directory + (cd "$temp_dir" && node ws_monitor.js "$ws_url" "$ACCESS_TOKEN" "$max_duration") local exit_code=$? - - # Clean up - rm -f "$ws_test_script" - + + # Clean up the temporary directory + log_step "4.2.3. Cleaning up temporary files..." + rm -rf "$temp_dir" + log_success "Cleanup complete." + if [ $exit_code -eq 0 ]; then log_success "Training job completed successfully!" - echo " ๐Ÿ“ก WebSocket monitoring detected job completion" - echo " ๐ŸŽ‰ Real-time progress tracking worked perfectly" + return 0 else - log_warning "WebSocket monitoring ended before job completion" - echo " ๐Ÿ“Š Check the progress logs above for details" + log_error "WebSocket monitoring ended with an error." + return 1 fi - - return $exit_code } - -install_websocat_if_needed() { - if ! command -v websocat >/dev/null 2>&1; then - echo "๐Ÿ“ฆ Installing websocat for better WebSocket testing..." - - # Try to install websocat (works on most Linux systems) - if command -v cargo >/dev/null 2>&1; then - cargo install websocat 2>/dev/null || true - elif [ -x "$(command -v wget)" ]; then - wget -q -O /tmp/websocat "https://github.com/vi/websocat/releases/latest/download/websocat.x86_64-unknown-linux-musl" 2>/dev/null || true - if [ -f /tmp/websocat ]; then - chmod +x /tmp/websocat - sudo mv /tmp/websocat /usr/local/bin/ 2>/dev/null || mv /tmp/websocat ~/bin/ 2>/dev/null || true - fi - fi - - if command -v websocat >/dev/null 2>&1; then - log_success "websocat installed successfully" - return 0 - else - log_warning "websocat installation failed, using Node.js fallback" - return 1 - fi - fi - return 0 -} - -# IMPROVED: WebSocket connection function with better tool selection test_websocket_connection() { local tenant_id="$1" local job_id="$2" @@ -604,117 +323,16 @@ test_websocket_connection() { echo "Test duration: ${duration}s" echo "" - # Try to install websocat if not available - if install_websocat_if_needed; then - test_websocket_with_websocat "$tenant_id" "$job_id" "$duration" - elif command -v node >/dev/null 2>&1; then + # Check for node and use the robust monitor script + if command -v node >/dev/null 2>&1 && command -v npm >/dev/null 2>&1; then test_websocket_with_nodejs_builtin "$tenant_id" "$job_id" "$duration" else - test_websocket_with_curl "$tenant_id" "$job_id" "$duration" + log_warning "Node.js or npm not found. Cannot run robust WebSocket monitor." + log_warning "Skipping real-time progress monitoring for this test." + return 0 fi } -# Test WebSocket using websocat (recommended) -test_websocket_with_websocat() { - local tenant_id="$1" - local job_id="$2" - local duration="$3" - - echo "Using websocat for WebSocket testing..." - - # Create a temporary file for WebSocket messages - local ws_log="/tmp/websocket_messages_$job_id.log" - - # Start WebSocket connection in background - ( - echo "Connecting to WebSocket..." - timeout "${duration}s" websocat "$WS_BASE/tenants/$tenant_id/training/jobs/$job_id/live" \ - --header "Authorization: Bearer $ACCESS_TOKEN" 2>&1 | \ - while IFS= read -r line; do - echo "$(date '+%H:%M:%S') | $line" | tee -a "$ws_log" - done - ) & - - WS_PID=$! - - # Send periodic ping messages to keep connection alive - sleep 2 - if kill -0 $WS_PID 2>/dev/null; then - echo "ping" | websocat "$WS_BASE/tenants/$tenant_id/training/jobs/$job_id/live" \ - --header "Authorization: Bearer $ACCESS_TOKEN" >/dev/null 2>&1 & - fi - - # Wait for test duration - log_step "4.2.1. Listening for WebSocket messages (${duration}s)..." - wait_for_websocket_messages "$ws_log" "$duration" - - # Clean up - if kill -0 $WS_PID 2>/dev/null; then - kill $WS_PID 2>/dev/null - wait $WS_PID 2>/dev/null - fi -} - -# Wait for WebSocket messages and analyze them -wait_for_websocket_messages() { - local ws_log="$1" - local duration="$2" - local start_time=$(date +%s) - local end_time=$((start_time + duration)) - - echo "๐Ÿ“ก Monitoring WebSocket messages..." - echo "Log file: $ws_log" - - # Show real-time progress - while [ $(date +%s) -lt $end_time ]; do - if [ -f "$ws_log" ]; then - local message_count=$(wc -l < "$ws_log" 2>/dev/null || echo "0") - local elapsed=$(($(date +%s) - start_time)) - printf "\rโฑ๏ธ Elapsed: ${elapsed}s | Messages: $message_count" - fi - sleep 1 - done - - echo "" - - # Analyze received messages - if [ -f "$ws_log" ] && [ -s "$ws_log" ]; then - local total_messages=$(wc -l < "$ws_log") - log_success "WebSocket test completed - received $total_messages messages" - - echo "" - echo "๐Ÿ“Š Message Analysis:" - - # Show message types - if grep -q "progress" "$ws_log"; then - local progress_count=$(grep -c "progress" "$ws_log") - echo " ๐Ÿ“ˆ Progress updates: $progress_count" - fi - - if grep -q "completed" "$ws_log"; then - echo " โœ… Completion messages: $(grep -c "completed" "$ws_log")" - fi - - if grep -q "failed\|error" "$ws_log"; then - echo " โŒ Error messages: $(grep -c "failed\|error" "$ws_log")" - fi - - echo "" - echo "๐Ÿ“ Recent messages (last 5):" - tail -5 "$ws_log" | sed 's/^/ /' - - else - log_warning "No WebSocket messages received during test period" - echo " This could mean:" - echo " โ€ข Training completed before WebSocket connection was established" - echo " โ€ข WebSocket endpoint is not working correctly" - echo " โ€ข Authentication issues with WebSocket connection" - echo " โ€ข Training service is not publishing progress events" - fi - - # Clean up log file - rm -f "$ws_log" -} # Enhanced training step with WebSocket testing enhanced_training_step_with_completion_check() { @@ -752,51 +370,15 @@ enhanced_training_step_with_completion_check() { echo " Job ID: $WEBSOCKET_JOB_ID" echo " Status: $JOB_STATUS" - # Determine monitoring strategy based on initial status - if [ "$JOB_STATUS" = "completed" ]; then - log_warning "Training completed instantly - no real-time progress to monitor" - echo " This can happen when:" - echo " โ€ข Models are already trained and cached" - echo " โ€ข No valid products found in sales data" - echo " โ€ข Training data is insufficient" - - # Show training results - TOTAL_PRODUCTS=$(extract_json_field "$TRAINING_RESPONSE" "training_results.total_products") - SUCCESSFUL_TRAININGS=$(extract_json_field "$TRAINING_RESPONSE" "training_results.successful_trainings") - SALES_RECORDS=$(extract_json_field "$TRAINING_RESPONSE" "data_summary.sales_records") - - echo "" - echo "๐Ÿ“Š Training Summary:" - echo " Sales records: $SALES_RECORDS" - echo " Products found: $TOTAL_PRODUCTS" - echo " Successful trainings: $SUCCESSFUL_TRAININGS" - - # Brief WebSocket connection test - log_step "4.2. Testing WebSocket endpoint (demonstration mode)" - echo "Testing WebSocket connection for 10 seconds..." - test_websocket_with_nodejs_builtin "$TENANT_ID" "$WEBSOCKET_JOB_ID" "10" - - else - # Training is in progress - use smart monitoring - log_step "4.2. Starting smart WebSocket monitoring" - echo " Strategy: Monitor until job completion" - echo " Maximum wait time: ${WS_TEST_DURATION}s (safety timeout)" - echo " Will automatically close when training completes" - echo "" - - # Use enhanced monitoring with longer timeout for real training - local SMART_DURATION=$WS_TEST_DURATION - - # Estimate duration based on data size (optional enhancement) - if [ -n "$SALES_RECORDS" ] && [ "$SALES_RECORDS" -gt 1000 ]; then - # For large datasets, extend timeout - SMART_DURATION=$((WS_TEST_DURATION * 2)) - echo " ๐Ÿ“Š Large dataset detected ($SALES_RECORDS records)" - echo " ๐Ÿ• Extended timeout to ${SMART_DURATION}s for thorough training" - fi - - test_websocket_with_nodejs_builtin "$TENANT_ID" "$WEBSOCKET_JOB_ID" "$SMART_DURATION" - fi + # Training is in progress - use smart monitoring + log_step "4.2. Starting smart WebSocket monitoring" + echo " Strategy: Monitor until job completion or timeout" + echo " Maximum wait time: ${WS_TEST_DURATION}s (safety timeout)" + echo " Will automatically close when training completes" + echo "" + + # Call the improved WebSocket monitoring function + test_websocket_connection "$TENANT_ID" "$WEBSOCKET_JOB_ID" "$WS_TEST_DURATION" else log_warning "Training started but couldn't extract job ID for WebSocket testing" @@ -988,30 +570,6 @@ if check_response "$BAKERY_RESPONSE" "Bakery Registration"; then echo "BAKERY_LONGITUDE=$MOCK_LONGITUDE" >> /tmp/bakery_coords.env echo "TENANT_ID=$TENANT_ID" >> /tmp/bakery_coords.env - log_step "2.2. Testing weather data acquisition with mock coordinates" - # Test if weather service can use these coordinates - WEATHER_TEST_RESPONSE=$(curl -s -X GET "$API_BASE/api/v1/training/$TENANT_ID/weather/current?latitude=$MOCK_LATITUDE&longitude=$MOCK_LONGITUDE" \ - -H "Authorization: Bearer $ACCESS_TOKEN" \ - -H "X-Tenant-ID: $TENANT_ID" 2>/dev/null || echo '{"status":"service_unavailable"}') - - if echo "$WEATHER_TEST_RESPONSE" | grep -q '"temperature"\|"weather"'; then - log_success "Weather service can use mock coordinates" - else - log_warning "Weather service test skipped (coordinates stored for training)" - fi - - log_step "2.3. Testing traffic data acquisition with mock coordinates" - # Test if traffic service can use these coordinates - TRAFFIC_TEST_RESPONSE=$(curl -s -X GET "$API_BASE/api/v1/training/$TENANT_ID/traffic/current?latitude=$MOCK_LATITUDE&longitude=$MOCK_LONGITUDE" \ - -H "Authorization: Bearer $ACCESS_TOKEN" \ - -H "X-Tenant-ID: $TENANT_ID" 2>/dev/null || echo '{"status":"service_unavailable"}') - - if echo "$TRAFFIC_TEST_RESPONSE" | grep -q '"traffic_volume"\|"intensity"'; then - log_success "Traffic service can use mock coordinates" - else - log_warning "Traffic service test skipped (coordinates stored for training)" - fi - else log_error "Failed to extract tenant ID" exit 1 @@ -1198,8 +756,6 @@ echo "" # STEP 4: MODEL TRAINING (ONBOARDING PAGE STEP 4) # ================================================================= -test_websocket_connection - enhanced_training_step_with_completion_check echo "" @@ -1307,15 +863,6 @@ else echo " Data validation metrics not available" fi -echo "" -echo "๐Ÿ”ง Known Issues Detected:" -if echo "$IMPORT_RESPONSE$FILE_UPLOAD_RESPONSE" | grep -q "Cannot convert tz-naive"; then - echo " โŒ TIMEZONE ERROR: CSV dates are timezone-naive" - echo " Solution: Apply timezone fix patch to data import service" - echo " File: services/data/app/services/data_import_service.py" - echo " Method: Replace _parse_date() with timezone-aware version" -fi - echo "" echo "๐Ÿงน Cleanup:" echo " To clean up test data, you may want to remove:" @@ -1327,31 +874,4 @@ rm -f "$VALIDATION_DATA_FILE" echo "" log_success "Improved onboarding flow simulation completed successfully!" -echo -e "${CYAN}The user journey through all 5 onboarding steps has been tested with FULL dataset.${NC}" - -# Final status check -if [ -n "$USER_ID" ] && [ -n "$TENANT_ID" ]; then - echo "" - echo -e "${GREEN}๐ŸŽ‰ All critical onboarding functionality is working!${NC}" - echo "The user can successfully:" - echo " โ€ข Register an account" - echo " โ€ข Set up their bakery" - echo " โ€ข Upload and validate FULL sales data" - echo " โ€ข Start model training with FULL dataset" - echo " โ€ข Access the platform dashboard" - - if [ -n "$VALID_RECORDS" ] && [ "$VALID_RECORDS" -gt 0 ]; then - echo "" - echo -e "${GREEN}๐Ÿ† BONUS: FULL dataset was successfully processed!${NC}" - echo " โ€ข $VALID_RECORDS valid sales records imported from FULL dataset" - echo " โ€ข Model training initiated with all products" - echo " โ€ข End-to-end data pipeline verified with complete data" - fi - - exit 0 -else - echo "" - echo -e "${YELLOW}โš ๏ธ Some issues detected in the onboarding flow${NC}" - echo "Check the logs above for specific failures" - exit 1 -fi \ No newline at end of file +echo -e "${CYAN}The user journey through all 5 onboarding steps has been tested with FULL dataset.${NC}" \ No newline at end of file