Fix remaining nested session issues in training pipeline
Issues Fixed: 4️⃣ data_processor.py (Line 230-232): - Second update_log_progress call without commit after data preparation - Added commit() after completion update to prevent deadlock - Added debug logging for visibility 5️⃣ prophet_manager.py _store_model (Line 750): - Created TRIPLE nested session (training_service → trainer → lock → _store_model) - Refactored _store_model to accept optional session parameter - Uses parent session from lock context instead of creating new one - Updated call site to pass db_session parameter Complete Session Hierarchy After All Fixes: training_service.py (session) └─ commit() ← FIX #2 (e585e9f) └─ trainer.py (new session) ✅ OK └─ data_processor.py (new session) └─ commit() after first update ← FIX #3 (b2de56e) └─ commit() after second update ← FIX #4 (THIS) └─ prophet_manager.train_bakery_model (uses parent or new session) ← FIX #1 (caff497) └─ lock.acquire(session) └─ _store_model(session=parent) ← FIX #5 (THIS) └─ NO NESTED SESSION ✅ All nested session deadlocks in training path are now resolved. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -230,6 +230,10 @@ class EnhancedBakeryDataProcessor:
|
|||||||
await repos['training_log'].update_log_progress(
|
await repos['training_log'].update_log_progress(
|
||||||
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
|
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
|
||||||
)
|
)
|
||||||
|
# ✅ FIX: Commit after final progress update to prevent deadlock
|
||||||
|
await db_session.commit()
|
||||||
|
logger.debug("Committed session after data preparation completion",
|
||||||
|
inventory_product_id=inventory_product_id)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Failed to store processing metadata",
|
logger.warning("Failed to store processing metadata",
|
||||||
|
|||||||
@@ -211,8 +211,9 @@ class BakeryProphetManager:
|
|||||||
|
|
||||||
# Store model and metrics - Generate proper UUID for model_id
|
# Store model and metrics - Generate proper UUID for model_id
|
||||||
model_id = str(uuid.uuid4())
|
model_id = str(uuid.uuid4())
|
||||||
|
# ✅ FIX: Pass session to _store_model to avoid nested session
|
||||||
model_path = await self._store_model(
|
model_path = await self._store_model(
|
||||||
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
|
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session
|
||||||
)
|
)
|
||||||
|
|
||||||
# Return same format as before, but with optimization info
|
# Return same format as before, but with optimization info
|
||||||
@@ -701,7 +702,8 @@ class BakeryProphetManager:
|
|||||||
training_data: pd.DataFrame,
|
training_data: pd.DataFrame,
|
||||||
regressor_columns: List[str],
|
regressor_columns: List[str],
|
||||||
optimized_params: Dict[str, Any] = None,
|
optimized_params: Dict[str, Any] = None,
|
||||||
training_metrics: Dict[str, Any] = None) -> str:
|
training_metrics: Dict[str, Any] = None,
|
||||||
|
session = None) -> str:
|
||||||
"""Store model with database integration"""
|
"""Store model with database integration"""
|
||||||
|
|
||||||
# Create model directory
|
# Create model directory
|
||||||
@@ -745,9 +747,11 @@ class BakeryProphetManager:
|
|||||||
self.models[model_key] = model
|
self.models[model_key] = model
|
||||||
self.model_metadata[model_key] = metadata
|
self.model_metadata[model_key] = metadata
|
||||||
|
|
||||||
# 🆕 NEW: Store in database using new session
|
# 🆕 NEW: Store in database using session (parent or new)
|
||||||
try:
|
use_parent_session = session is not None
|
||||||
async with self.database_manager.get_session() as db_session:
|
|
||||||
|
async def _store_in_db(db_session):
|
||||||
|
"""Inner function to store model in database"""
|
||||||
# Deactivate previous models for this product
|
# Deactivate previous models for this product
|
||||||
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
|
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
|
||||||
|
|
||||||
@@ -801,6 +805,16 @@ class BakeryProphetManager:
|
|||||||
|
|
||||||
logger.info(f"Model {model_id} stored in database successfully")
|
logger.info(f"Model {model_id} stored in database successfully")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# ✅ FIX: Use parent session if provided, otherwise create new one
|
||||||
|
if use_parent_session:
|
||||||
|
logger.debug(f"Using parent session for storing model {model_id}")
|
||||||
|
await _store_in_db(session)
|
||||||
|
else:
|
||||||
|
logger.debug(f"Creating new session for storing model {model_id}")
|
||||||
|
async with self.database_manager.get_session() as new_session:
|
||||||
|
await _store_in_db(new_session)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to store model in database: {str(e)}")
|
logger.error(f"Failed to store model in database: {str(e)}")
|
||||||
# Continue execution - file storage succeeded
|
# Continue execution - file storage succeeded
|
||||||
|
|||||||
Reference in New Issue
Block a user