""" Enhanced ML Trainer with Repository Pattern Main ML pipeline coordinator using repository pattern for data access and dependency injection """ from typing import Dict, List, Any, Optional import pandas as pd import numpy as np from datetime import datetime import structlog import uuid import time import asyncio from app.ml.data_processor import EnhancedBakeryDataProcessor from app.ml.prophet_manager import BakeryProphetManager from app.services.training_orchestrator import TrainingDataSet from app.core.config import settings from shared.database.base import create_database_manager from shared.database.transactions import transactional from shared.database.unit_of_work import UnitOfWork from shared.database.exceptions import DatabaseError from app.repositories import ( ModelRepository, TrainingLogRepository, PerformanceRepository, ArtifactRepository ) from app.services.progress_tracker import ParallelProductProgressTracker from app.services.training_events import ( publish_training_started, publish_data_analysis, publish_training_completed, publish_training_failed ) logger = structlog.get_logger() class EnhancedBakeryMLTrainer: """ Enhanced ML trainer using repository pattern for data access and comprehensive tracking. Orchestrates the complete ML training pipeline with proper database abstraction. """ def __init__(self, database_manager=None): self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service") self.enhanced_data_processor = EnhancedBakeryDataProcessor(self.database_manager) self.prophet_manager = BakeryProphetManager(database_manager=self.database_manager) async def _get_repositories(self, session): """Initialize repositories with session""" return { 'model': ModelRepository(session), 'training_log': TrainingLogRepository(session), 'performance': PerformanceRepository(session), 'artifact': ArtifactRepository(session) } async def train_tenant_models(self, tenant_id: str, training_dataset: TrainingDataSet, job_id: Optional[str] = None, session=None) -> Dict[str, Any]: """ Train models for all products using repository pattern with enhanced tracking. Args: tenant_id: Tenant identifier training_dataset: Prepared training dataset with aligned dates job_id: Training job identifier Returns: Dictionary with training results for each product """ if not job_id: job_id = f"enhanced_ml_{tenant_id}_{uuid.uuid4().hex[:8]}" logger.info("Starting enhanced ML training pipeline", job_id=job_id, tenant_id=tenant_id) try: # Get database session and repositories async with self.database_manager.get_session() as db_session: repos = await self._get_repositories(db_session) # Convert sales data to DataFrame sales_df = pd.DataFrame(training_dataset.sales_data) weather_df = pd.DataFrame(training_dataset.weather_data) traffic_df = pd.DataFrame(training_dataset.traffic_data) # Validate input data await self._validate_input_data(sales_df, tenant_id) # Get unique products from the sales data products = sales_df['inventory_product_id'].unique().tolist() # Debug: Log sales data details to understand why only one product is found total_sales_records = len(sales_df) sales_by_product = sales_df.groupby('inventory_product_id').size().to_dict() logger.info("Enhanced training pipeline - Sales data analysis", total_sales_records=total_sales_records, products_count=len(products), products=products, sales_by_product=sales_by_product) if len(products) == 1: logger.warning("Only ONE product found in sales data - this may indicate a data fetching issue", tenant_id=tenant_id, single_product_id=products[0], total_sales_records=total_sales_records) elif len(products) == 0: raise ValueError("No products found in sales data") else: logger.info("Multiple products detected for training", products_count=len(products)) # Event 1: Training Started (0%) - update with actual product count # Note: Initial event was already published by API endpoint, this updates with real count await publish_training_started(job_id, tenant_id, len(products)) # Create initial training log entry await repos['training_log'].update_log_progress( job_id, 5, "data_processing", "running" ) # Process data for each product using enhanced processor logger.info("Processing data using enhanced processor") processed_data = await self._process_all_products_enhanced( sales_df, weather_df, traffic_df, products, tenant_id, job_id ) # Event 2: Data Analysis (20%) await publish_data_analysis( job_id, tenant_id, f"Data analysis completed for {len(processed_data)} products" ) # Train models for each processed product with progress aggregation logger.info("Training models with repository integration and progress aggregation") # Create progress tracker for parallel product training (20-80%) progress_tracker = ParallelProductProgressTracker( job_id=job_id, tenant_id=tenant_id, total_products=len(processed_data) ) training_results = await self._train_all_models_enhanced( tenant_id, processed_data, job_id, repos, progress_tracker ) # Calculate overall training summary with enhanced metrics summary = await self._calculate_enhanced_training_summary( training_results, repos, tenant_id ) # Calculate successful and failed trainings successful_trainings = len([r for r in training_results.values() if r.get('status') == 'success']) failed_trainings = len([r for r in training_results.values() if r.get('status') == 'error']) total_duration = sum([r.get('training_time_seconds', 0) for r in training_results.values()]) # Event 4: Training Completed (100%) await publish_training_completed( job_id, tenant_id, successful_trainings, failed_trainings, total_duration ) # Create comprehensive result with repository data result = { "job_id": job_id, "tenant_id": tenant_id, "status": "completed", "products_trained": len([r for r in training_results.values() if r.get('status') == 'success']), "products_failed": len([r for r in training_results.values() if r.get('status') == 'error']), "products_skipped": len([r for r in training_results.values() if r.get('status') == 'skipped']), "total_products": len(products), "training_results": training_results, "enhanced_summary": summary, "models_trained": summary.get('models_created', {}), "data_info": { "date_range": { "start": training_dataset.date_range.start.isoformat(), "end": training_dataset.date_range.end.isoformat(), "duration_days": (training_dataset.date_range.end - training_dataset.date_range.start).days }, "data_sources": [source.value for source in training_dataset.date_range.available_sources], "constraints_applied": training_dataset.date_range.constraints }, "repository_metadata": { "total_records_created": summary.get('total_db_records', 0), "performance_metrics_stored": summary.get('performance_metrics_created', 0), "artifacts_created": summary.get('artifacts_created', 0) }, "completed_at": datetime.now().isoformat() } logger.info("Enhanced ML training pipeline completed successfully", job_id=job_id, models_created=len([r for r in training_results.values() if r.get('status') == 'success'])) return result except Exception as e: logger.error("Enhanced ML training pipeline failed", job_id=job_id, error=str(e)) # Publish training failed event await publish_training_failed(job_id, tenant_id, str(e)) raise async def _process_all_products_enhanced(self, sales_df: pd.DataFrame, weather_df: pd.DataFrame, traffic_df: pd.DataFrame, products: List[str], tenant_id: str, job_id: str) -> Dict[str, pd.DataFrame]: """Process data for all products using enhanced processor with repository tracking""" processed_data = {} for inventory_product_id in products: try: logger.info("Processing data for product using enhanced processor", inventory_product_id=inventory_product_id) # Filter sales data for this product product_sales = sales_df[sales_df['inventory_product_id'] == inventory_product_id].copy() if product_sales.empty: logger.warning("No sales data found for product", inventory_product_id=inventory_product_id) continue # Use enhanced data processor with repository tracking processed_product_data = await self.enhanced_data_processor.prepare_training_data( sales_data=product_sales, weather_data=weather_df, traffic_data=traffic_df, inventory_product_id=inventory_product_id, tenant_id=tenant_id, job_id=job_id ) processed_data[inventory_product_id] = processed_product_data logger.info("Enhanced processing completed", inventory_product_id=inventory_product_id, data_points=len(processed_product_data)) except Exception as e: logger.error("Failed to process data using enhanced processor", inventory_product_id=inventory_product_id, error=str(e)) continue return processed_data async def _train_single_product(self, tenant_id: str, inventory_product_id: str, product_data: pd.DataFrame, job_id: str, repos: Dict, progress_tracker: ParallelProductProgressTracker) -> tuple[str, Dict[str, Any]]: """Train a single product model - used for parallel execution with progress aggregation""" product_start_time = time.time() try: logger.info("Training model", inventory_product_id=inventory_product_id) # Check if we have enough data if len(product_data) < settings.MIN_TRAINING_DATA_DAYS: result = { 'status': 'skipped', 'reason': 'insufficient_data', 'data_points': len(product_data), 'min_required': settings.MIN_TRAINING_DATA_DAYS, 'message': f'Need at least {settings.MIN_TRAINING_DATA_DAYS} data points, got {len(product_data)}' } logger.warning("Skipping product due to insufficient data", inventory_product_id=inventory_product_id, data_points=len(product_data), min_required=settings.MIN_TRAINING_DATA_DAYS) return inventory_product_id, result # Train the model using Prophet manager model_info = await self.prophet_manager.train_bakery_model( tenant_id=tenant_id, inventory_product_id=inventory_product_id, df=product_data, job_id=job_id ) # Store model record using repository model_record = await self._create_model_record( repos, tenant_id, inventory_product_id, model_info, job_id, product_data ) # Create performance metrics record if model_info.get('training_metrics'): await self._create_performance_metrics( repos, model_record.id if model_record else None, tenant_id, inventory_product_id, model_info['training_metrics'] ) result = { 'status': 'success', 'model_info': model_info, 'model_record_id': model_record.id if model_record else None, 'data_points': len(product_data), 'training_time_seconds': time.time() - product_start_time, 'trained_at': datetime.now().isoformat() } logger.info("Successfully trained model", inventory_product_id=inventory_product_id, model_record_id=model_record.id if model_record else None) # Report completion to progress tracker (emits Event 3: product_completed) await progress_tracker.mark_product_completed(inventory_product_id) return inventory_product_id, result except Exception as e: logger.error("Failed to train model", inventory_product_id=inventory_product_id, error=str(e)) result = { 'status': 'error', 'error_message': str(e), 'data_points': len(product_data) if product_data is not None else 0, 'training_time_seconds': time.time() - product_start_time, 'failed_at': datetime.now().isoformat() } # Report failure to progress tracker (still emits Event 3: product_completed) await progress_tracker.mark_product_completed(inventory_product_id) return inventory_product_id, result async def _train_all_models_enhanced(self, tenant_id: str, processed_data: Dict[str, pd.DataFrame], job_id: str, repos: Dict, progress_tracker: ParallelProductProgressTracker) -> Dict[str, Any]: """Train models with throttled parallel execution and progress tracking""" total_products = len(processed_data) logger.info(f"Starting throttled parallel training for {total_products} products") # Create training tasks for all products training_tasks = [ self._train_single_product( tenant_id=tenant_id, inventory_product_id=inventory_product_id, product_data=product_data, job_id=job_id, repos=repos, progress_tracker=progress_tracker ) for inventory_product_id, product_data in processed_data.items() ] # Execute training tasks with throttling to prevent heartbeat blocking # Limit concurrent operations to prevent CPU/memory exhaustion from app.core.config import settings max_concurrent = getattr(settings, 'MAX_CONCURRENT_TRAININGS', 3) logger.info(f"Executing training with max {max_concurrent} concurrent operations", total_products=total_products) # Process tasks in batches to prevent blocking the event loop results_list = [] for i in range(0, len(training_tasks), max_concurrent): batch = training_tasks[i:i + max_concurrent] batch_results = await asyncio.gather(*batch, return_exceptions=True) results_list.extend(batch_results) # Yield control to event loop to allow heartbeat processing # Increased from 0.01s to 0.1s (100ms) to ensure WebSocket pings, RabbitMQ heartbeats, # and progress events can be processed during long training operations await asyncio.sleep(0.1) # Log progress to verify event loop is responsive logger.debug( "Training batch completed, yielding to event loop", batch_num=(i // max_concurrent) + 1, total_batches=(len(training_tasks) + max_concurrent - 1) // max_concurrent, products_completed=len(results_list), total_products=len(training_tasks) ) # Log final summary summary = progress_tracker.get_progress() logger.info("Throttled parallel training completed", total=summary['total_products'], completed=summary['products_completed']) # Convert results to dictionary training_results = {} for result in results_list: if isinstance(result, Exception): logger.error(f"Training task failed with exception: {result}") continue product_id, product_result = result training_results[product_id] = product_result logger.info(f"Throttled parallel training completed: {len(training_results)} products processed") return training_results async def _create_model_record(self, repos: Dict, tenant_id: str, inventory_product_id: str, model_info: Dict, job_id: str, processed_data: pd.DataFrame): """Create model record using repository""" try: model_data = { "tenant_id": tenant_id, "inventory_product_id": inventory_product_id, "job_id": job_id, "model_type": "enhanced_prophet", "model_path": model_info.get("model_path"), "metadata_path": model_info.get("metadata_path"), "mape": model_info.get("training_metrics", {}).get("mape"), "mae": model_info.get("training_metrics", {}).get("mae"), "rmse": model_info.get("training_metrics", {}).get("rmse"), "r2_score": model_info.get("training_metrics", {}).get("r2"), "training_samples": len(processed_data), "hyperparameters": model_info.get("hyperparameters"), "features_used": list(processed_data.columns), "normalization_params": self.enhanced_data_processor.get_scalers(), # Include scalers for prediction consistency "is_active": True, "is_production": True, "data_quality_score": model_info.get("data_quality_score", 100.0) } model_record = await repos['model'].create_model(model_data) logger.info("Created enhanced model record", inventory_product_id=inventory_product_id, model_id=model_record.id) # Create artifacts for model files if model_info.get("model_path"): await repos['artifact'].create_artifact({ "model_id": str(model_record.id), "tenant_id": tenant_id, "artifact_type": "enhanced_model_file", "file_path": model_info["model_path"], "storage_location": "local" }) return model_record except Exception as e: logger.error("Failed to create enhanced model record", inventory_product_id=inventory_product_id, error=str(e)) return None async def _create_performance_metrics(self, repos: Dict, model_id: str, tenant_id: str, inventory_product_id: str, metrics: Dict): """Create performance metrics record using repository""" try: metric_data = { "model_id": str(model_id), "tenant_id": tenant_id, "inventory_product_id": inventory_product_id, "mae": metrics.get("mae"), "mse": metrics.get("mse"), "rmse": metrics.get("rmse"), "mape": metrics.get("mape"), "r2_score": metrics.get("r2"), "accuracy_percentage": 100 - metrics.get("mape", 0) if metrics.get("mape") else None, "evaluation_samples": metrics.get("data_points", 0) } await repos['performance'].create_performance_metric(metric_data) logger.info("Created enhanced performance metrics", inventory_product_id=inventory_product_id, model_id=model_id) except Exception as e: logger.error("Failed to create enhanced performance metrics", inventory_product_id=inventory_product_id, error=str(e)) async def _calculate_enhanced_training_summary(self, training_results: Dict[str, Any], repos: Dict, tenant_id: str) -> Dict[str, Any]: """Calculate enhanced summary statistics with repository data""" total_products = len(training_results) successful_products = len([r for r in training_results.values() if r.get('status') == 'success']) failed_products = len([r for r in training_results.values() if r.get('status') == 'error']) skipped_products = len([r for r in training_results.values() if r.get('status') == 'skipped']) # Calculate average training metrics for successful models successful_results = [r for r in training_results.values() if r.get('status') == 'success'] avg_metrics = {} if successful_results: metrics_list = [r['model_info'].get('training_metrics', {}) for r in successful_results] if metrics_list and all(metrics_list): avg_metrics = { 'avg_mae': round(np.mean([m.get('mae', 0) for m in metrics_list]), 2), 'avg_rmse': round(np.mean([m.get('rmse', 0) for m in metrics_list]), 2), 'avg_mape': round(np.mean([m.get('mape', 0) for m in metrics_list]), 2), 'avg_r2': round(np.mean([m.get('r2', 0) for m in metrics_list]), 3), 'avg_training_time': round(np.mean([r.get('training_time_seconds', 0) for r in successful_results]), 2) } # Calculate data quality insights data_points_list = [r.get('data_points', 0) for r in training_results.values()] # Get database statistics try: # Get tenant model count from repository tenant_models = await repos['model'].get_models_by_tenant(tenant_id) models_created = [r.get('model_record_id') for r in successful_results if r.get('model_record_id')] db_stats = { 'total_tenant_models': len(tenant_models), 'models_created_this_job': len(models_created), 'total_db_records': len(models_created), 'performance_metrics_created': len(models_created), # One per model 'artifacts_created': len([r for r in successful_results if r.get('model_info', {}).get('model_path')]) } except Exception as e: logger.warning("Failed to get database statistics", error=str(e)) db_stats = { 'total_tenant_models': 0, 'models_created_this_job': 0, 'total_db_records': 0, 'performance_metrics_created': 0, 'artifacts_created': 0 } # Build models_created with proper model result structure models_created = {} for product, result in training_results.items(): if result.get('status') == 'success' and result.get('model_info'): model_info = result['model_info'] models_created[product] = { 'status': 'completed', 'model_path': model_info.get('model_path'), 'metadata_path': model_info.get('metadata_path'), 'metrics': model_info.get('training_metrics', {}), 'hyperparameters': model_info.get('hyperparameters', {}), 'features_used': model_info.get('features_used', []), 'data_points': result.get('data_points', 0), 'data_quality_score': model_info.get('data_quality_score', 100.0), 'model_record_id': result.get('model_record_id') } enhanced_summary = { 'total_products': total_products, 'successful_products': successful_products, 'failed_products': failed_products, 'skipped_products': skipped_products, 'success_rate': round(successful_products / total_products * 100, 2) if total_products > 0 else 0, 'enhanced_average_metrics': avg_metrics, 'enhanced_data_summary': { 'total_data_points': sum(data_points_list), 'avg_data_points_per_product': round(np.mean(data_points_list), 1) if data_points_list else 0, 'min_data_points': min(data_points_list) if data_points_list else 0, 'max_data_points': max(data_points_list) if data_points_list else 0 }, 'database_statistics': db_stats, 'models_created': models_created } # Add database statistics to the summary enhanced_summary.update(db_stats) return enhanced_summary async def _validate_input_data(self, sales_df: pd.DataFrame, tenant_id: str): """Validate input sales data with enhanced error reporting""" if sales_df.empty: raise ValueError(f"No sales data provided for tenant {tenant_id}") # Handle quantity column mapping if 'quantity_sold' in sales_df.columns and 'quantity' not in sales_df.columns: sales_df['quantity'] = sales_df['quantity_sold'] logger.info("Mapped quantity column", from_column='quantity_sold', to_column='quantity') required_columns = ['date', 'inventory_product_id', 'quantity'] missing_columns = [col for col in required_columns if col not in sales_df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") # Check for valid dates try: sales_df['date'] = pd.to_datetime(sales_df['date']) except Exception: raise ValueError("Invalid date format in sales data") # Check for valid quantities if not sales_df['quantity'].dtype in ['int64', 'float64']: try: sales_df['quantity'] = pd.to_numeric(sales_df['quantity'], errors='coerce') except Exception: raise ValueError("Quantity column must be numeric") async def evaluate_model_performance_enhanced(self, tenant_id: str, inventory_product_id: str, model_path: str, test_dataset: TrainingDataSet) -> Dict[str, Any]: """ Enhanced model evaluation with repository integration. """ try: logger.info("Enhanced model evaluation starting", tenant_id=tenant_id, inventory_product_id=inventory_product_id) # Get database session and repositories async with self.database_manager.get_session() as db_session: repos = await self._get_repositories(db_session) # Convert test data to DataFrames test_sales_df = pd.DataFrame(test_dataset.sales_data) test_weather_df = pd.DataFrame(test_dataset.weather_data) test_traffic_df = pd.DataFrame(test_dataset.traffic_data) # Filter for specific product product_test_sales = test_sales_df[test_sales_df['inventory_product_id'] == inventory_product_id].copy() if product_test_sales.empty: raise ValueError(f"No test data found for product: {inventory_product_id}") # Process test data using enhanced processor processed_test_data = await self.enhanced_data_processor.prepare_training_data( sales_data=product_test_sales, weather_data=test_weather_df, traffic_data=test_traffic_df, inventory_product_id=inventory_product_id, tenant_id=tenant_id ) # Create future dataframe for prediction future_dates = processed_test_data[['ds']].copy() # Add regressor columns regressor_columns = [col for col in processed_test_data.columns if col not in ['ds', 'y']] for col in regressor_columns: future_dates[col] = processed_test_data[col] # Generate predictions forecast = await self.prophet_manager.generate_forecast( model_path=model_path, future_dates=future_dates, regressor_columns=regressor_columns ) # Calculate performance metrics from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score y_true = processed_test_data['y'].values y_pred = forecast['yhat'].values # Ensure arrays are the same length min_len = min(len(y_true), len(y_pred)) y_true = y_true[:min_len] y_pred = y_pred[:min_len] metrics = { "mae": float(mean_absolute_error(y_true, y_pred)), "rmse": float(np.sqrt(mean_squared_error(y_true, y_pred))), "r2_score": float(r2_score(y_true, y_pred)) } # Calculate MAPE safely non_zero_mask = y_true > 0.1 if np.sum(non_zero_mask) > 0: mape = np.mean(np.abs((y_true[non_zero_mask] - y_pred[non_zero_mask]) / y_true[non_zero_mask])) * 100 metrics["mape"] = float(min(mape, 200)) # Cap at 200% else: metrics["mape"] = 100.0 # Store evaluation metrics in repository model_records = await repos['model'].get_models_by_product(tenant_id, inventory_product_id) if model_records: latest_model = max(model_records, key=lambda x: x.created_at) await self._create_performance_metrics( repos, latest_model.id, tenant_id, inventory_product_id, metrics ) result = { "tenant_id": tenant_id, "inventory_product_id": inventory_product_id, "enhanced_evaluation_metrics": metrics, "test_samples": len(processed_test_data), "prediction_samples": len(forecast), "test_period": { "start": test_dataset.date_range.start.isoformat(), "end": test_dataset.date_range.end.isoformat() }, "evaluated_at": datetime.now().isoformat(), "repository_integration": { "metrics_stored": True, "model_record_found": len(model_records) > 0 if model_records else False } } return result except Exception as e: logger.error("Enhanced model evaluation failed", error=str(e)) raise