diff --git a/frontend/src/components/EnhancedTrainingProgress.tsx b/frontend/src/components/EnhancedTrainingProgress.tsx index 7e6356d5..cdd74ad5 100644 --- a/frontend/src/components/EnhancedTrainingProgress.tsx +++ b/frontend/src/components/EnhancedTrainingProgress.tsx @@ -47,7 +47,7 @@ const STEP_INFO_MAP = { icon: Target, color: 'green' }, - // Fallback for unknown steps + // Handle any unmapped steps 'default': { title: 'Procesando...', description: 'Procesando tus datos para crear el modelo de predicción', diff --git a/frontend/src/pages/onboarding/OnboardingPage.tsx b/frontend/src/pages/onboarding/OnboardingPage.tsx index 90f799dc..2b325abb 100644 --- a/frontend/src/pages/onboarding/OnboardingPage.tsx +++ b/frontend/src/pages/onboarding/OnboardingPage.tsx @@ -105,7 +105,10 @@ const OnboardingPage: React.FC = ({ user, onComplete }) => currentStep: data.current_step || data.currentStep || 'Procesando...', productsCompleted: data.products_completed || data.productsCompleted || prev.productsCompleted, productsTotal: data.products_total || data.productsTotal || prev.productsTotal, - estimatedTimeRemaining: data.estimated_time_remaining || data.estimatedTimeRemaining || prev.estimatedTimeRemaining, + estimatedTimeRemaining: data.estimated_time_remaining_minutes || + data.estimated_time_remaining || + data.estimatedTimeRemaining || + prev.estimatedTimeRemaining, status: 'running' })); diff --git a/services/training/app/ml/trainer.py b/services/training/app/ml/trainer.py index 75f4bd79..12b78fab 100644 --- a/services/training/app/ml/trainer.py +++ b/services/training/app/ml/trainer.py @@ -20,12 +20,7 @@ from app.core.config import settings from sqlalchemy.ext.asyncio import AsyncSession -from app.services.messaging import ( - publish_job_progress, - publish_data_validation_started, - publish_data_validation_completed, - publish_job_step_completed -) +from app.services.messaging import TrainingStatusPublisher logger = logging.getLogger(__name__) @@ -59,6 +54,8 @@ class BakeryMLTrainer: logger.info(f"Starting ML training pipeline {job_id} for tenant {tenant_id}") + self.status_publisher = TrainingStatusPublisher(job_id, tenant_id) + try: # Convert sales data to DataFrame sales_df = pd.DataFrame(training_dataset.sales_data) @@ -72,12 +69,18 @@ class BakeryMLTrainer: products = sales_df['product_name'].unique().tolist() logger.info(f"Training models for {len(products)} products: {products}") + self.status_publisher.products_total = len(products) + # Process data for each product logger.info("Processing data for all products...") processed_data = await self._process_all_products( sales_df, weather_df, traffic_df, products ) - await publish_job_progress(job_id, tenant_id, 20, "feature_engineering", estimated_time_remaining_minutes=7) + await self.status_publisher.progress_update( + progress=20, + step="feature_engineering", + step_details="Processing features for all products" + ) # Train models for each processed product logger.info("Training models for all products...") @@ -87,7 +90,11 @@ class BakeryMLTrainer: # Calculate overall training summary summary = self._calculate_training_summary(training_results) - await publish_job_progress(job_id, tenant_id, 90, "model_validation", estimated_time_remaining_minutes=1) + await self.status_publisher.progress_update( + progress=90, + step="model_validation", + step_details="Validating model performance" + ) result = { "job_id": job_id, @@ -399,16 +406,11 @@ class BakeryMLTrainer: job_id: str) -> Dict[str, Any]: """Train models for all processed products using Prophet manager""" training_results = {} - + i = 0 total_products = len(processed_data) base_progress = 45 - max_progress = 85 # or whatever your target end progress is - products_total = 0 - i = 0 + max_progress = 85 - start_time = time.time() - processing_times = [] # Store individual processing times - for product_name, product_data in processed_data.items(): product_start_time = time.time() try: @@ -424,7 +426,6 @@ class BakeryMLTrainer: 'message': f'Need at least {settings.MIN_TRAINING_DATA_DAYS} data points, got {len(product_data)}' } logger.warning(f"Skipping {product_name}: insufficient data ({len(product_data)} < {settings.MIN_TRAINING_DATA_DAYS})") - processing_times.append(time.time() - product_start_time) continue # Train the model using Prophet manager @@ -444,6 +445,20 @@ class BakeryMLTrainer: logger.info(f"Successfully trained model for {product_name}") + completed_products = i + 1 + progress = base_progress + int((completed_products / total_products) * (max_progress - base_progress)) + + if self.status_publisher: + # Update products completed for accurate tracking + self.status_publisher.products_completed = completed_products + + await self.status_publisher.product_completed( + progress=progress, + step="model_training", + current_product=product_name, + step_details=f"Completed training for {product_name}" + ) + except Exception as e: logger.error(f"Failed to train model for {product_name}: {str(e)}") training_results[product_name] = { @@ -452,29 +467,18 @@ class BakeryMLTrainer: 'data_points': len(product_data) if product_data is not None else 0, 'failed_at': datetime.now().isoformat() } + + completed_products = i + 1 + + if self.status_publisher: + self.status_publisher.products_completed = completed_products + await self.status_publisher.progress_update( + progress=progress, + step="model_training", + current_product=product_name, + step_details=f"Failed training for {product_name}: {str(e)}" + ) - # Record processing time for this product - product_processing_time = time.time() - product_start_time - processing_times.append(product_processing_time) - - i += 1 - current_progress = base_progress + int((i / total_products) * (max_progress - base_progress)) - - # Calculate estimated time remaining - estimated_time_remaining_minutes = self.calculate_estimated_time_remaining( - processing_times, i, total_products - ) - - await publish_job_progress( - job_id, - tenant_id, - current_progress, - "model_training", - product_name, - products_total, - total_products, - estimated_time_remaining_minutes=estimated_time_remaining_minutes - ) return training_results diff --git a/services/training/app/services/messaging.py b/services/training/app/services/messaging.py index 6b834470..5615a4e4 100644 --- a/services/training/app/services/messaging.py +++ b/services/training/app/services/messaging.py @@ -518,27 +518,59 @@ class TrainingStatusPublisher: current_product: Optional[str] = None, step_details: Optional[str] = None ): - """Publish progress update with calculated time estimates""" + """Publish progress update with improved time estimates""" elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60 - if progress > 0: - estimated_total_minutes = (elapsed_minutes / progress) * 100 - estimated_remaining = max(0, estimated_total_minutes - elapsed_minutes) - else: - estimated_remaining = None + # Improved estimation based on training phases + estimated_remaining = self._calculate_smart_time_remaining(progress, elapsed_minutes, step) await publish_job_progress( job_id=self.job_id, tenant_id=self.tenant_id, - progress=int(progress), # Ensure int + progress=int(progress), step=step, current_product=current_product, - products_completed=int(self.products_completed), # Ensure int - products_total=int(self.products_total), # Ensure int + products_completed=int(self.products_completed), + products_total=int(self.products_total), estimated_time_remaining_minutes=int(estimated_remaining) if estimated_remaining else None, step_details=step_details ) + + def _calculate_smart_time_remaining(self, progress: int, elapsed_minutes: float, step: str) -> Optional[int]: + """Calculate estimated time remaining using phase-based estimation""" + # Define expected time distribution for each phase + phase_durations = { + "data_validation": 1.0, # 1 minute + "feature_engineering": 2.0, # 2 minutes + "model_training": 8.0, # 8 minutes (bulk of time) + "model_validation": 1.0 # 1 minute + } + + total_expected_minutes = sum(phase_durations.values()) # 12 minutes + + # Calculate progress through phases + if progress <= 10: # data_validation phase + remaining_in_phase = phase_durations["data_validation"] * (1 - (progress / 10)) + remaining_after_phase = sum(list(phase_durations.values())[1:]) + return int(remaining_in_phase + remaining_after_phase) + + elif progress <= 20: # feature_engineering phase + remaining_in_phase = phase_durations["feature_engineering"] * (1 - ((progress - 10) / 10)) + remaining_after_phase = sum(list(phase_durations.values())[2:]) + return int(remaining_in_phase + remaining_after_phase) + + elif progress <= 90: # model_training phase (biggest chunk) + remaining_in_phase = phase_durations["model_training"] * (1 - ((progress - 20) / 70)) + remaining_after_phase = phase_durations["model_validation"] + return int(remaining_in_phase + remaining_after_phase) + + elif progress <= 100: # model_validation phase + remaining_in_phase = phase_durations["model_validation"] * (1 - ((progress - 90) / 10)) + return int(remaining_in_phase) + + return 0 + async def product_completed(self, product_name: str, model_id: str, metrics: Optional[Dict] = None): """Mark a product as completed and update progress""" self.products_completed += 1 diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index aed25136..bd6c6829 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -28,6 +28,8 @@ from app.services.messaging import ( publish_job_failed ) +from app.services.messaging import TrainingStatusPublisher + logger = logging.getLogger(__name__) class TrainingService: @@ -73,6 +75,7 @@ class TrainingService: logger.info(f"Starting training job {job_id} for tenant {tenant_id}") + self.status_publisher = TrainingStatusPublisher(job_id, tenant_id) try: @@ -85,7 +88,12 @@ class TrainingService: requested_end=requested_end, job_id=job_id ) - await publish_job_progress(job_id, tenant_id, 10, "data_validation", estimated_time_remaining_minutes=8) + + await self.status_publisher.progress_update( + progress=10, + step="data_validation", + step_details="Data validation and alignment completed" + ) # Step 2: Execute ML training pipeline logger.info("Step 2: Starting ML training pipeline")