Fix remaining nested session issues in training pipeline
Issues Fixed: 4️⃣ data_processor.py (Line 230-232): - Second update_log_progress call without commit after data preparation - Added commit() after completion update to prevent deadlock - Added debug logging for visibility 5️⃣ prophet_manager.py _store_model (Line 750): - Created TRIPLE nested session (training_service → trainer → lock → _store_model) - Refactored _store_model to accept optional session parameter - Uses parent session from lock context instead of creating new one - Updated call site to pass db_session parameter Complete Session Hierarchy After All Fixes: training_service.py (session) └─ commit() ← FIX #2 (e585e9f) └─ trainer.py (new session) ✅ OK └─ data_processor.py (new session) └─ commit() after first update ← FIX #3 (b2de56e) └─ commit() after second update ← FIX #4 (THIS) └─ prophet_manager.train_bakery_model (uses parent or new session) ← FIX #1 (caff497) └─ lock.acquire(session) └─ _store_model(session=parent) ← FIX #5 (THIS) └─ NO NESTED SESSION ✅ All nested session deadlocks in training path are now resolved. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -230,6 +230,10 @@ class EnhancedBakeryDataProcessor:
|
||||
await repos['training_log'].update_log_progress(
|
||||
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
|
||||
)
|
||||
# ✅ FIX: Commit after final progress update to prevent deadlock
|
||||
await db_session.commit()
|
||||
logger.debug("Committed session after data preparation completion",
|
||||
inventory_product_id=inventory_product_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to store processing metadata",
|
||||
|
||||
@@ -211,8 +211,9 @@ class BakeryProphetManager:
|
||||
|
||||
# Store model and metrics - Generate proper UUID for model_id
|
||||
model_id = str(uuid.uuid4())
|
||||
# ✅ FIX: Pass session to _store_model to avoid nested session
|
||||
model_path = await self._store_model(
|
||||
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
|
||||
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session
|
||||
)
|
||||
|
||||
# Return same format as before, but with optimization info
|
||||
@@ -701,7 +702,8 @@ class BakeryProphetManager:
|
||||
training_data: pd.DataFrame,
|
||||
regressor_columns: List[str],
|
||||
optimized_params: Dict[str, Any] = None,
|
||||
training_metrics: Dict[str, Any] = None) -> str:
|
||||
training_metrics: Dict[str, Any] = None,
|
||||
session = None) -> str:
|
||||
"""Store model with database integration"""
|
||||
|
||||
# Create model directory
|
||||
@@ -745,9 +747,11 @@ class BakeryProphetManager:
|
||||
self.models[model_key] = model
|
||||
self.model_metadata[model_key] = metadata
|
||||
|
||||
# 🆕 NEW: Store in database using new session
|
||||
try:
|
||||
async with self.database_manager.get_session() as db_session:
|
||||
# 🆕 NEW: Store in database using session (parent or new)
|
||||
use_parent_session = session is not None
|
||||
|
||||
async def _store_in_db(db_session):
|
||||
"""Inner function to store model in database"""
|
||||
# Deactivate previous models for this product
|
||||
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
|
||||
|
||||
@@ -801,6 +805,16 @@ class BakeryProphetManager:
|
||||
|
||||
logger.info(f"Model {model_id} stored in database successfully")
|
||||
|
||||
try:
|
||||
# ✅ FIX: Use parent session if provided, otherwise create new one
|
||||
if use_parent_session:
|
||||
logger.debug(f"Using parent session for storing model {model_id}")
|
||||
await _store_in_db(session)
|
||||
else:
|
||||
logger.debug(f"Creating new session for storing model {model_id}")
|
||||
async with self.database_manager.get_session() as new_session:
|
||||
await _store_in_db(new_session)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store model in database: {str(e)}")
|
||||
# Continue execution - file storage succeeded
|
||||
|
||||
Reference in New Issue
Block a user