Fix remaining nested session issues in training pipeline
Issues Fixed: 4️⃣ data_processor.py (Line 230-232): - Second update_log_progress call without commit after data preparation - Added commit() after completion update to prevent deadlock - Added debug logging for visibility 5️⃣ prophet_manager.py _store_model (Line 750): - Created TRIPLE nested session (training_service → trainer → lock → _store_model) - Refactored _store_model to accept optional session parameter - Uses parent session from lock context instead of creating new one - Updated call site to pass db_session parameter Complete Session Hierarchy After All Fixes: training_service.py (session) └─ commit() ← FIX #2 (e585e9f) └─ trainer.py (new session) ✅ OK └─ data_processor.py (new session) └─ commit() after first update ← FIX #3 (b2de56e) └─ commit() after second update ← FIX #4 (THIS) └─ prophet_manager.train_bakery_model (uses parent or new session) ← FIX #1 (caff497) └─ lock.acquire(session) └─ _store_model(session=parent) ← FIX #5 (THIS) └─ NO NESTED SESSION ✅ All nested session deadlocks in training path are now resolved. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -230,6 +230,10 @@ class EnhancedBakeryDataProcessor:
|
|||||||
await repos['training_log'].update_log_progress(
|
await repos['training_log'].update_log_progress(
|
||||||
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
|
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
|
||||||
)
|
)
|
||||||
|
# ✅ FIX: Commit after final progress update to prevent deadlock
|
||||||
|
await db_session.commit()
|
||||||
|
logger.debug("Committed session after data preparation completion",
|
||||||
|
inventory_product_id=inventory_product_id)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Failed to store processing metadata",
|
logger.warning("Failed to store processing metadata",
|
||||||
|
|||||||
@@ -211,8 +211,9 @@ class BakeryProphetManager:
|
|||||||
|
|
||||||
# Store model and metrics - Generate proper UUID for model_id
|
# Store model and metrics - Generate proper UUID for model_id
|
||||||
model_id = str(uuid.uuid4())
|
model_id = str(uuid.uuid4())
|
||||||
|
# ✅ FIX: Pass session to _store_model to avoid nested session
|
||||||
model_path = await self._store_model(
|
model_path = await self._store_model(
|
||||||
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
|
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session
|
||||||
)
|
)
|
||||||
|
|
||||||
# Return same format as before, but with optimization info
|
# Return same format as before, but with optimization info
|
||||||
@@ -701,7 +702,8 @@ class BakeryProphetManager:
|
|||||||
training_data: pd.DataFrame,
|
training_data: pd.DataFrame,
|
||||||
regressor_columns: List[str],
|
regressor_columns: List[str],
|
||||||
optimized_params: Dict[str, Any] = None,
|
optimized_params: Dict[str, Any] = None,
|
||||||
training_metrics: Dict[str, Any] = None) -> str:
|
training_metrics: Dict[str, Any] = None,
|
||||||
|
session = None) -> str:
|
||||||
"""Store model with database integration"""
|
"""Store model with database integration"""
|
||||||
|
|
||||||
# Create model directory
|
# Create model directory
|
||||||
@@ -745,61 +747,73 @@ class BakeryProphetManager:
|
|||||||
self.models[model_key] = model
|
self.models[model_key] = model
|
||||||
self.model_metadata[model_key] = metadata
|
self.model_metadata[model_key] = metadata
|
||||||
|
|
||||||
# 🆕 NEW: Store in database using new session
|
# 🆕 NEW: Store in database using session (parent or new)
|
||||||
|
use_parent_session = session is not None
|
||||||
|
|
||||||
|
async def _store_in_db(db_session):
|
||||||
|
"""Inner function to store model in database"""
|
||||||
|
# Deactivate previous models for this product
|
||||||
|
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
|
||||||
|
|
||||||
|
# Helper to ensure hyperparameters are JSON serializable
|
||||||
|
def _serialize_hyperparameters(params):
|
||||||
|
if not params:
|
||||||
|
return {}
|
||||||
|
safe_params = {}
|
||||||
|
for k, v in params.items():
|
||||||
|
try:
|
||||||
|
if isinstance(v, (int, float, str, bool, type(None))):
|
||||||
|
safe_params[k] = v
|
||||||
|
elif hasattr(v, 'item'): # numpy scalars
|
||||||
|
safe_params[k] = v.item()
|
||||||
|
elif isinstance(v, (list, tuple)):
|
||||||
|
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
|
||||||
|
else:
|
||||||
|
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
|
||||||
|
except:
|
||||||
|
safe_params[k] = str(v) # fallback to string conversion
|
||||||
|
return safe_params
|
||||||
|
|
||||||
|
# Create new database record
|
||||||
|
db_model = TrainedModel(
|
||||||
|
id=model_id,
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
inventory_product_id=inventory_product_id,
|
||||||
|
model_type="prophet_optimized",
|
||||||
|
job_id=model_id.split('_')[0], # Extract job_id from model_id
|
||||||
|
model_path=str(model_path),
|
||||||
|
metadata_path=str(metadata_path),
|
||||||
|
hyperparameters=_serialize_hyperparameters(optimized_params or {}),
|
||||||
|
features_used=[str(f) for f in regressor_columns] if regressor_columns else [],
|
||||||
|
is_active=True,
|
||||||
|
is_production=True, # New models are production-ready
|
||||||
|
training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None),
|
||||||
|
training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None),
|
||||||
|
training_samples=len(training_data)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add training metrics if available
|
||||||
|
if training_metrics:
|
||||||
|
db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None
|
||||||
|
db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None
|
||||||
|
db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None
|
||||||
|
db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None
|
||||||
|
db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None
|
||||||
|
|
||||||
|
db_session.add(db_model)
|
||||||
|
await db_session.commit()
|
||||||
|
|
||||||
|
logger.info(f"Model {model_id} stored in database successfully")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with self.database_manager.get_session() as db_session:
|
# ✅ FIX: Use parent session if provided, otherwise create new one
|
||||||
# Deactivate previous models for this product
|
if use_parent_session:
|
||||||
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
|
logger.debug(f"Using parent session for storing model {model_id}")
|
||||||
|
await _store_in_db(session)
|
||||||
# Helper to ensure hyperparameters are JSON serializable
|
else:
|
||||||
def _serialize_hyperparameters(params):
|
logger.debug(f"Creating new session for storing model {model_id}")
|
||||||
if not params:
|
async with self.database_manager.get_session() as new_session:
|
||||||
return {}
|
await _store_in_db(new_session)
|
||||||
safe_params = {}
|
|
||||||
for k, v in params.items():
|
|
||||||
try:
|
|
||||||
if isinstance(v, (int, float, str, bool, type(None))):
|
|
||||||
safe_params[k] = v
|
|
||||||
elif hasattr(v, 'item'): # numpy scalars
|
|
||||||
safe_params[k] = v.item()
|
|
||||||
elif isinstance(v, (list, tuple)):
|
|
||||||
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
|
|
||||||
else:
|
|
||||||
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
|
|
||||||
except:
|
|
||||||
safe_params[k] = str(v) # fallback to string conversion
|
|
||||||
return safe_params
|
|
||||||
|
|
||||||
# Create new database record
|
|
||||||
db_model = TrainedModel(
|
|
||||||
id=model_id,
|
|
||||||
tenant_id=tenant_id,
|
|
||||||
inventory_product_id=inventory_product_id,
|
|
||||||
model_type="prophet_optimized",
|
|
||||||
job_id=model_id.split('_')[0], # Extract job_id from model_id
|
|
||||||
model_path=str(model_path),
|
|
||||||
metadata_path=str(metadata_path),
|
|
||||||
hyperparameters=_serialize_hyperparameters(optimized_params or {}),
|
|
||||||
features_used=[str(f) for f in regressor_columns] if regressor_columns else [],
|
|
||||||
is_active=True,
|
|
||||||
is_production=True, # New models are production-ready
|
|
||||||
training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None),
|
|
||||||
training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None),
|
|
||||||
training_samples=len(training_data)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add training metrics if available
|
|
||||||
if training_metrics:
|
|
||||||
db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None
|
|
||||||
db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None
|
|
||||||
db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None
|
|
||||||
db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None
|
|
||||||
db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None
|
|
||||||
|
|
||||||
db_session.add(db_model)
|
|
||||||
await db_session.commit()
|
|
||||||
|
|
||||||
logger.info(f"Model {model_id} stored in database successfully")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to store model in database: {str(e)}")
|
logger.error(f"Failed to store model in database: {str(e)}")
|
||||||
|
|||||||
Reference in New Issue
Block a user