Fix remaining nested session issues in training pipeline

Issues Fixed:

4️⃣ data_processor.py (Line 230-232):
- Second update_log_progress call without commit after data preparation
- Added commit() after completion update to prevent deadlock
- Added debug logging for visibility

5️⃣ prophet_manager.py _store_model (Line 750):
- Created TRIPLE nested session (training_service → trainer → lock → _store_model)
- Refactored _store_model to accept optional session parameter
- Uses parent session from lock context instead of creating new one
- Updated call site to pass db_session parameter

Complete Session Hierarchy After All Fixes:
training_service.py (session)
  └─ commit() ← FIX #2 (e585e9f)
  └─ trainer.py (new session)  OK
      └─ data_processor.py (new session)
          └─ commit() after first update ← FIX #3 (b2de56e)
          └─ commit() after second update ← FIX #4 (THIS)
      └─ prophet_manager.train_bakery_model (uses parent or new session) ← FIX #1 (caff497)
          └─ lock.acquire(session)
              └─ _store_model(session=parent) ← FIX #5 (THIS)
                  └─ NO NESTED SESSION 

All nested session deadlocks in training path are now resolved.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Urtzi Alfaro
2025-11-05 16:41:53 +01:00
parent b2de56ead3
commit fd0a96e254
2 changed files with 80 additions and 62 deletions

View File

@@ -211,8 +211,9 @@ class BakeryProphetManager:
# Store model and metrics - Generate proper UUID for model_id
model_id = str(uuid.uuid4())
# ✅ FIX: Pass session to _store_model to avoid nested session
model_path = await self._store_model(
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session
)
# Return same format as before, but with optimization info
@@ -693,15 +694,16 @@ class BakeryProphetManager:
"optimized": False, "improvement_estimated": 0.0
}
async def _store_model(self,
tenant_id: str,
inventory_product_id: str,
model: Prophet,
async def _store_model(self,
tenant_id: str,
inventory_product_id: str,
model: Prophet,
model_id: str,
training_data: pd.DataFrame,
regressor_columns: List[str],
optimized_params: Dict[str, Any] = None,
training_metrics: Dict[str, Any] = None) -> str:
training_metrics: Dict[str, Any] = None,
session = None) -> str:
"""Store model with database integration"""
# Create model directory
@@ -745,62 +747,74 @@ class BakeryProphetManager:
self.models[model_key] = model
self.model_metadata[model_key] = metadata
# 🆕 NEW: Store in database using new session
try:
async with self.database_manager.get_session() as db_session:
# Deactivate previous models for this product
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
# Helper to ensure hyperparameters are JSON serializable
def _serialize_hyperparameters(params):
if not params:
return {}
safe_params = {}
for k, v in params.items():
try:
if isinstance(v, (int, float, str, bool, type(None))):
safe_params[k] = v
elif hasattr(v, 'item'): # numpy scalars
safe_params[k] = v.item()
elif isinstance(v, (list, tuple)):
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
else:
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
except:
safe_params[k] = str(v) # fallback to string conversion
return safe_params
# 🆕 NEW: Store in database using session (parent or new)
use_parent_session = session is not None
async def _store_in_db(db_session):
"""Inner function to store model in database"""
# Deactivate previous models for this product
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
# Helper to ensure hyperparameters are JSON serializable
def _serialize_hyperparameters(params):
if not params:
return {}
safe_params = {}
for k, v in params.items():
try:
if isinstance(v, (int, float, str, bool, type(None))):
safe_params[k] = v
elif hasattr(v, 'item'): # numpy scalars
safe_params[k] = v.item()
elif isinstance(v, (list, tuple)):
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
else:
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
except:
safe_params[k] = str(v) # fallback to string conversion
return safe_params
# Create new database record
db_model = TrainedModel(
id=model_id,
tenant_id=tenant_id,
inventory_product_id=inventory_product_id,
model_type="prophet_optimized",
job_id=model_id.split('_')[0], # Extract job_id from model_id
model_path=str(model_path),
metadata_path=str(metadata_path),
hyperparameters=_serialize_hyperparameters(optimized_params or {}),
features_used=[str(f) for f in regressor_columns] if regressor_columns else [],
is_active=True,
is_production=True, # New models are production-ready
training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None),
training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None),
training_samples=len(training_data)
)
# Add training metrics if available
if training_metrics:
db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None
db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None
db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None
db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None
db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None
db_session.add(db_model)
await db_session.commit()
logger.info(f"Model {model_id} stored in database successfully")
try:
# ✅ FIX: Use parent session if provided, otherwise create new one
if use_parent_session:
logger.debug(f"Using parent session for storing model {model_id}")
await _store_in_db(session)
else:
logger.debug(f"Creating new session for storing model {model_id}")
async with self.database_manager.get_session() as new_session:
await _store_in_db(new_session)
# Create new database record
db_model = TrainedModel(
id=model_id,
tenant_id=tenant_id,
inventory_product_id=inventory_product_id,
model_type="prophet_optimized",
job_id=model_id.split('_')[0], # Extract job_id from model_id
model_path=str(model_path),
metadata_path=str(metadata_path),
hyperparameters=_serialize_hyperparameters(optimized_params or {}),
features_used=[str(f) for f in regressor_columns] if regressor_columns else [],
is_active=True,
is_production=True, # New models are production-ready
training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None),
training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None),
training_samples=len(training_data)
)
# Add training metrics if available
if training_metrics:
db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None
db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None
db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None
db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None
db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None
db_session.add(db_model)
await db_session.commit()
logger.info(f"Model {model_id} stored in database successfully")
except Exception as e:
logger.error(f"Failed to store model in database: {str(e)}")
# Continue execution - file storage succeeded