- {trainingProgress.error || 'Ha ocurrido un error durante el entrenamiento'}
-
-
-
-
-
-
-
-
-
- )}
-
-
- {/* Educational Content */}
-
-
-
¿Qué está pasando?
-
- Nuestro sistema está analizando patrones estacionales, tendencias de demanda y factores externos para crear un modelo personalizado para tu panadería.
-
-
-
-
-
Beneficios esperados
-
- Predicciones de demanda precisas, reducción de desperdicio, optimización de stock y mejor planificación de producción.
-
-
-
-
- );
+ return (
+ {
+ // Handle timeout - either navigate to dashboard or show limited access
+ console.log('Training timeout - user wants to continue to dashboard');
+ // You can add your custom timeout logic here
+ }}
+ />
+ );
case 5:
return (
diff --git a/services/training/app/api/training.py b/services/training/app/api/training.py
index ed578a29..113d3a8d 100644
--- a/services/training/app/api/training.py
+++ b/services/training/app/api/training.py
@@ -80,28 +80,6 @@ async def start_training_job(
requested_start=request.start_date,
requested_end=request.end_date
)
-
- training_config = {
- "job_id": job_id,
- "tenant_id": tenant_id,
- "bakery_location": {
- "latitude": 40.4168,
- "longitude": -3.7038
- },
- "requested_start": request.start_date.isoformat() if request.start_date else None,
- "requested_end": request.end_date.isoformat() if request.end_date else None,
- "estimated_duration_minutes": 15,
- "estimated_products": 10,
- "background_execution": True,
- "api_version": "v1"
- }
-
- # Publish immediate event (training started)
- await publish_job_started(
- job_id=job_id,
- tenant_id=tenant_id,
- config=training_config
- )
# Return immediate success response
response_data = {
@@ -174,11 +152,30 @@ async def execute_training_job_background(
status_manager = TrainingStatusManager(db_session=db_session)
- # Publish progress event
- await publish_job_progress(job_id, tenant_id, 5, "Initializing training pipeline")
-
try:
+ training_config = {
+ "job_id": job_id,
+ "tenant_id": tenant_id,
+ "bakery_location": {
+ "latitude": 40.4168,
+ "longitude": -3.7038
+ },
+ "requested_start": requested_start if requested_start else None,
+ "requested_end": requested_end if requested_end else None,
+ "estimated_duration_minutes": 15,
+ "estimated_products": None,
+ "background_execution": True,
+ "api_version": "v1"
+ }
+
+ # Publish immediate event (training started)
+ await publish_job_started(
+ job_id=job_id,
+ tenant_id=tenant_id,
+ config=training_config
+ )
+
await status_manager.update_job_status(
job_id=job_id,
status="running",
diff --git a/services/training/app/ml/trainer.py b/services/training/app/ml/trainer.py
index 530b6126..75f4bd79 100644
--- a/services/training/app/ml/trainer.py
+++ b/services/training/app/ml/trainer.py
@@ -10,6 +10,8 @@ import numpy as np
from datetime import datetime
import logging
import uuid
+import time
+from datetime import datetime
from app.ml.data_processor import BakeryDataProcessor
from app.ml.prophet_manager import BakeryProphetManager
@@ -75,6 +77,7 @@ class BakeryMLTrainer:
processed_data = await self._process_all_products(
sales_df, weather_df, traffic_df, products
)
+ await publish_job_progress(job_id, tenant_id, 20, "feature_engineering", estimated_time_remaining_minutes=7)
# Train models for each processed product
logger.info("Training models for all products...")
@@ -84,6 +87,7 @@ class BakeryMLTrainer:
# Calculate overall training summary
summary = self._calculate_training_summary(training_results)
+ await publish_job_progress(job_id, tenant_id, 90, "model_validation", estimated_time_remaining_minutes=1)
result = {
"job_id": job_id,
@@ -354,6 +358,41 @@ class BakeryMLTrainer:
return processed_data
+ def calculate_estimated_time_remaining(self, processing_times: List[float], completed: int, total: int) -> int:
+ """
+ Calculate estimated time remaining based on actual processing times
+
+ Args:
+ processing_times: List of processing times for completed items (in seconds)
+ completed: Number of items completed so far
+ total: Total number of items to process
+
+ Returns:
+ Estimated time remaining in minutes
+ """
+ if not processing_times or completed >= total:
+ return 0
+
+ # Calculate average processing time
+ avg_time_per_item = sum(processing_times) / len(processing_times)
+
+ # Use weighted average giving more weight to recent processing times
+ if len(processing_times) > 3:
+ # Use last 3 items for more accurate recent performance
+ recent_times = processing_times[-3:]
+ recent_avg = sum(recent_times) / len(recent_times)
+ # Weighted average: 70% recent, 30% overall
+ avg_time_per_item = (recent_avg * 0.7) + (avg_time_per_item * 0.3)
+
+ # Calculate remaining items and estimated time
+ remaining_items = total - completed
+ estimated_seconds = remaining_items * avg_time_per_item
+
+ # Convert to minutes and round up
+ estimated_minutes = max(1, int(estimated_seconds / 60) + (1 if estimated_seconds % 60 > 0 else 0))
+
+ return estimated_minutes
+
async def _train_all_models(self,
tenant_id: str,
processed_data: Dict[str, pd.DataFrame],
@@ -361,7 +400,17 @@ class BakeryMLTrainer:
"""Train models for all processed products using Prophet manager"""
training_results = {}
+ total_products = len(processed_data)
+ base_progress = 45
+ max_progress = 85 # or whatever your target end progress is
+ products_total = 0
+ i = 0
+
+ start_time = time.time()
+ processing_times = [] # Store individual processing times
+
for product_name, product_data in processed_data.items():
+ product_start_time = time.time()
try:
logger.info(f"Training model for product: {product_name}")
@@ -375,6 +424,7 @@ class BakeryMLTrainer:
'message': f'Need at least {settings.MIN_TRAINING_DATA_DAYS} data points, got {len(product_data)}'
}
logger.warning(f"Skipping {product_name}: insufficient data ({len(product_data)} < {settings.MIN_TRAINING_DATA_DAYS})")
+ processing_times.append(time.time() - product_start_time)
continue
# Train the model using Prophet manager
@@ -402,6 +452,29 @@ class BakeryMLTrainer:
'data_points': len(product_data) if product_data is not None else 0,
'failed_at': datetime.now().isoformat()
}
+
+ # Record processing time for this product
+ product_processing_time = time.time() - product_start_time
+ processing_times.append(product_processing_time)
+
+ i += 1
+ current_progress = base_progress + int((i / total_products) * (max_progress - base_progress))
+
+ # Calculate estimated time remaining
+ estimated_time_remaining_minutes = self.calculate_estimated_time_remaining(
+ processing_times, i, total_products
+ )
+
+ await publish_job_progress(
+ job_id,
+ tenant_id,
+ current_progress,
+ "model_training",
+ product_name,
+ products_total,
+ total_products,
+ estimated_time_remaining_minutes=estimated_time_remaining_minutes
+ )
return training_results
diff --git a/services/training/app/services/training_orchestrator.py b/services/training/app/services/training_orchestrator.py
index c2ea27a7..5babf99d 100644
--- a/services/training/app/services/training_orchestrator.py
+++ b/services/training/app/services/training_orchestrator.py
@@ -75,19 +75,13 @@ class TrainingDataOrchestrator:
try:
- await publish_job_progress(job_id, tenant_id, 5, "Extrayendo datos de ventas",
- step_details="Conectando con servicio de datos")
sales_data = await self.data_client.fetch_sales_data(tenant_id)
# Step 1: Extract and validate sales data date range
- await publish_job_progress(job_id, tenant_id, 10, "Validando fechas de datos de venta",
- step_details="Aplicando restricciones de fuentes de datos")
sales_date_range = self._extract_sales_date_range(sales_data)
logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}")
# Step 2: Apply date alignment across all data sources
- await publish_job_progress(job_id, tenant_id, 15, "Alinear el rango de fechas",
- step_details="Aplicar la alineación de fechas en todas las fuentes de datos")
aligned_range = self.date_alignment_service.validate_and_align_dates(
user_sales_range=sales_date_range,
requested_start=requested_start,
@@ -99,21 +93,15 @@ class TrainingDataOrchestrator:
logger.info(f"Applied constraints: {aligned_range.constraints}")
# Step 3: Filter sales data to aligned date range
- await publish_job_progress(job_id, tenant_id, 20, "Alinear el rango de las ventas",
- step_details="Aplicar la alineación de fechas de las ventas")
filtered_sales = self._filter_sales_data(sales_data, aligned_range)
# Step 4: Collect external data sources concurrently
logger.info("Collecting external data sources...")
- await publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas",
- step_details="Recopilación de fuentes de datos externas")
weather_data, traffic_data = await self._collect_external_data(
aligned_range, bakery_location, tenant_id
)
# Step 5: Validate data quality
- await publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos",
- step_details="Validando la calidad de los datos")
data_quality_results = self._validate_data_sources(
filtered_sales, weather_data, traffic_data, aligned_range
)
@@ -140,8 +128,6 @@ class TrainingDataOrchestrator:
)
# Step 7: Final validation
- await publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos",
- step_details="Validancion final de los datos")
final_validation = self.validate_training_data_quality(training_dataset)
training_dataset.metadata["final_validation"] = final_validation
diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py
index f6a85b6c..aed25136 100644
--- a/services/training/app/services/training_service.py
+++ b/services/training/app/services/training_service.py
@@ -78,7 +78,6 @@ class TrainingService:
# Step 1: Prepare training dataset with date alignment and orchestration
logger.info("Step 1: Preparing and aligning training data")
- await publish_job_progress(job_id, tenant_id, 0, "Extrayendo datos de ventas")
training_dataset = await self.orchestrator.prepare_training_data(
tenant_id=tenant_id,
bakery_location=bakery_location,
@@ -86,10 +85,10 @@ class TrainingService:
requested_end=requested_end,
job_id=job_id
)
+ await publish_job_progress(job_id, tenant_id, 10, "data_validation", estimated_time_remaining_minutes=8)
# Step 2: Execute ML training pipeline
logger.info("Step 2: Starting ML training pipeline")
- await publish_job_progress(job_id, tenant_id, 35, "Starting ML training pipeline")
training_results = await self.trainer.train_tenant_models(
tenant_id=tenant_id,
training_dataset=training_dataset,
@@ -117,7 +116,7 @@ class TrainingService:
}
logger.info(f"Training job {job_id} completed successfully")
- await publish_job_completed(job_id, tenant_id, final_result);
+ await publish_job_completed(job_id, tenant_id, final_result)
return TrainingService.create_detailed_training_response(final_result)
except Exception as e: