diff --git a/services/training/app/ml/data_processor.py b/services/training/app/ml/data_processor.py index 94968418..3fc57abc 100644 --- a/services/training/app/ml/data_processor.py +++ b/services/training/app/ml/data_processor.py @@ -230,7 +230,11 @@ class EnhancedBakeryDataProcessor: await repos['training_log'].update_log_progress( job_id, 25, f"data_prepared_{inventory_product_id}", "running" ) - + # ✅ FIX: Commit after final progress update to prevent deadlock + await db_session.commit() + logger.debug("Committed session after data preparation completion", + inventory_product_id=inventory_product_id) + except Exception as e: logger.warning("Failed to store processing metadata", error=str(e)) diff --git a/services/training/app/ml/prophet_manager.py b/services/training/app/ml/prophet_manager.py index 9291a568..923e6da1 100644 --- a/services/training/app/ml/prophet_manager.py +++ b/services/training/app/ml/prophet_manager.py @@ -211,8 +211,9 @@ class BakeryProphetManager: # Store model and metrics - Generate proper UUID for model_id model_id = str(uuid.uuid4()) + # ✅ FIX: Pass session to _store_model to avoid nested session model_path = await self._store_model( - tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics + tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session ) # Return same format as before, but with optimization info @@ -693,15 +694,16 @@ class BakeryProphetManager: "optimized": False, "improvement_estimated": 0.0 } - async def _store_model(self, - tenant_id: str, - inventory_product_id: str, - model: Prophet, + async def _store_model(self, + tenant_id: str, + inventory_product_id: str, + model: Prophet, model_id: str, training_data: pd.DataFrame, regressor_columns: List[str], optimized_params: Dict[str, Any] = None, - training_metrics: Dict[str, Any] = None) -> str: + training_metrics: Dict[str, Any] = None, + session = None) -> str: """Store model with database integration""" # Create model directory @@ -745,62 +747,74 @@ class BakeryProphetManager: self.models[model_key] = model self.model_metadata[model_key] = metadata - # 🆕 NEW: Store in database using new session - try: - async with self.database_manager.get_session() as db_session: - # Deactivate previous models for this product - await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id) - - # Helper to ensure hyperparameters are JSON serializable - def _serialize_hyperparameters(params): - if not params: - return {} - safe_params = {} - for k, v in params.items(): - try: - if isinstance(v, (int, float, str, bool, type(None))): - safe_params[k] = v - elif hasattr(v, 'item'): # numpy scalars - safe_params[k] = v.item() - elif isinstance(v, (list, tuple)): - safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v] - else: - safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v) - except: - safe_params[k] = str(v) # fallback to string conversion - return safe_params + # 🆕 NEW: Store in database using session (parent or new) + use_parent_session = session is not None + + async def _store_in_db(db_session): + """Inner function to store model in database""" + # Deactivate previous models for this product + await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id) + + # Helper to ensure hyperparameters are JSON serializable + def _serialize_hyperparameters(params): + if not params: + return {} + safe_params = {} + for k, v in params.items(): + try: + if isinstance(v, (int, float, str, bool, type(None))): + safe_params[k] = v + elif hasattr(v, 'item'): # numpy scalars + safe_params[k] = v.item() + elif isinstance(v, (list, tuple)): + safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v] + else: + safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v) + except: + safe_params[k] = str(v) # fallback to string conversion + return safe_params + + # Create new database record + db_model = TrainedModel( + id=model_id, + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + model_type="prophet_optimized", + job_id=model_id.split('_')[0], # Extract job_id from model_id + model_path=str(model_path), + metadata_path=str(metadata_path), + hyperparameters=_serialize_hyperparameters(optimized_params or {}), + features_used=[str(f) for f in regressor_columns] if regressor_columns else [], + is_active=True, + is_production=True, # New models are production-ready + training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None), + training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None), + training_samples=len(training_data) + ) + + # Add training metrics if available + if training_metrics: + db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None + db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None + db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None + db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None + db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None + + db_session.add(db_model) + await db_session.commit() + + logger.info(f"Model {model_id} stored in database successfully") + + try: + # ✅ FIX: Use parent session if provided, otherwise create new one + if use_parent_session: + logger.debug(f"Using parent session for storing model {model_id}") + await _store_in_db(session) + else: + logger.debug(f"Creating new session for storing model {model_id}") + async with self.database_manager.get_session() as new_session: + await _store_in_db(new_session) - # Create new database record - db_model = TrainedModel( - id=model_id, - tenant_id=tenant_id, - inventory_product_id=inventory_product_id, - model_type="prophet_optimized", - job_id=model_id.split('_')[0], # Extract job_id from model_id - model_path=str(model_path), - metadata_path=str(metadata_path), - hyperparameters=_serialize_hyperparameters(optimized_params or {}), - features_used=[str(f) for f in regressor_columns] if regressor_columns else [], - is_active=True, - is_production=True, # New models are production-ready - training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None), - training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None), - training_samples=len(training_data) - ) - - # Add training metrics if available - if training_metrics: - db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None - db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None - db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None - db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None - db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None - - db_session.add(db_model) - await db_session.commit() - - logger.info(f"Model {model_id} stored in database successfully") - except Exception as e: logger.error(f"Failed to store model in database: {str(e)}") # Continue execution - file storage succeeded