Fix orchestrator issues

This commit is contained in:
Urtzi Alfaro
2025-11-05 22:54:14 +01:00
parent 80728eaa4e
commit 3ad093d38b
9 changed files with 422 additions and 484 deletions

View File

@@ -1,283 +0,0 @@
# Training Log Analysis - 2025-11-05
## Executive Summary
**Status:** Training FAILED - 5/5 products failed to store in database despite successful model training
**Root Cause:** Double commit bug in `prophet_manager.py` causing database transaction conflicts
**Impact:** All models successfully trained and saved to disk (with good MAPE scores 11.96%-14.16%), but database metadata storage failed due to transaction state errors
---
## Detailed Findings
### 1. Warnings (Non-Critical)
#### Spanish Holidays Loading Failure
- **Count:** 5 occurrences
- **Message:** `Could not load Spanish holidays dynamically: Logger._log() got an unexpected keyword argument 'region'`
- **Impact:** Minor - Prophet models may not properly account for Spanish holidays
- **Location:** `app/ml/prophet_manager.py` - holiday configuration
- **Severity:** LOW
---
### 2. Critical Errors
#### A. Double Commit Transaction Error (ROOT CAUSE)
- **Location:** `services/training/app/ml/prophet_manager.py:750-824`
- **Function:** `_store_model()`
- **Error Message:**
```
Failed to store model in database: Method 'commit()' can't be called here;
method '_prepare_impl()' is already in progress and this would cause an
unexpected state change to <SessionTransactionState.CLOSED: 5>
```
- **Affected Products:** All 5 products
- **Occurrences:** 3 direct errors
**Technical Explanation:**
The bug occurs due to duplicate commit logic when no parent session is provided:
1. Line 751: `use_parent_session = session is not None` → False when session=None
2. Lines 819-825: Code creates new dedicated session and calls `_store_in_db(new_session)`
3. Line 807-808: Inside `_store_in_db()`, commits because `use_parent_session=False`
4. Line 824: Tries to commit again on same session → CONFLICT!
**Code Flow:**
```python
# Line 751
use_parent_session = session is not None # False
# Lines 819-825 (when no parent session)
else:
async with self.database_manager.get_session() as new_session:
await _store_in_db(new_session) # Commits inside (line 808)
await new_session.commit() # ❌ Double commit!
```
#### B. Cascading Transaction Failures
- **Error:** `InFailedSQLTransactionError: current transaction is aborted, commands ignored until end of transaction block`
- **Count:** ~20 occurrences
- **Cause:** After initial double-commit error, transaction enters failed state
- **Impact:** All subsequent database operations fail
**Affected Operations:**
- Getting active models
- Creating trained model records
- Updating training logs
- Retrieving database statistics
---
### 3. Training Results
| Product ID | Model Trained | File Saved | MAPE | DB Record | Final Status |
|------------|---------------|------------|------|-----------|--------------|
| `5a2155a0-6ccb-4d29-a6db-d25f8e4cc806` | ✅ Yes | ✅ Yes | 13.75% | ❌ No | PARTIAL |
| `dc4a9ca0-a0e0-436c-94ef-d76c9551ab8c` | ✅ Yes | ✅ Yes | 13.04% | ❌ No | PARTIAL |
| `bcb395bd-b8a4-40de-ba1d-549de48e1688` | ✅ Yes | ✅ Yes | 11.96% | ❌ No | PARTIAL |
| `ba7aa8bf-e4be-4bbc-a9df-6929c7a8eaa9` | ✅ Yes | ✅ Yes | 14.16% | ❌ No | PARTIAL |
| `3adadeb8-9d42-4565-afd4-37e0367280ba` | ✅ Yes | ✅ Yes | 12.77% | ❌ No | PARTIAL |
**Summary:**
- ✅ **5/5 models successfully trained** with good accuracy (MAPE 11.96% - 14.16%)
- ✅ **5/5 model files saved to disk** at `/app/models/{tenant_id}/{model_id}.pkl`
- ❌ **0/5 database records created** - all failed due to transaction errors
- ❌ **Final status: FAILED** - models cannot be used without DB records
**Model Files Created:**
```
/app/models/40c4c65b-f2e7-4d4a-a930-97ac60844fb5/a1cdc2c3-2b11-4e8d-a9a9-008c8bad2369.pkl ✅
/app/models/40c4c65b-f2e7-4d4a-a930-97ac60844fb5/b02bec7b-736b-4cc8-85d9-9caa48479d6d.pkl ✅
/app/models/40c4c65b-f2e7-4d4a-a930-97ac60844fb5/b5865fca-4901-472c-8803-b4031a250661.pkl ✅
/app/models/40c4c65b-f2e7-4d4a-a930-97ac60844fb5/a9f94bcd-9266-4d24-987b-2aca0d0448a5.pkl ✅
/app/models/40c4c65b-f2e7-4d4a-a930-97ac60844fb5/dc8402d0-a053-4b1b-a9c6-fc77480b5170.pkl ✅
```
---
### 4. Timeline of Events
```
17:39:45 - Data preparation completed for 5 products
17:39:45 - Product categorization: 3 pastries, 2 bread
17:39:45 - Parallel training started (3 concurrent operations)
17:39:45-46 - BATCH 1: Training 3 products in parallel
✅ Prophet models fitted successfully
✅ Models saved to disk with checksums
✅ Previous models deactivated
❌ First double-commit error at 17:39:46
❌ Transaction enters failed state
❌ All subsequent DB operations fail
17:39:46-47 - BATCH 2: Training remaining 2 products
✅ Prophet models fitted successfully
✅ Models saved to disk with checksums
❌ Cannot write to DB (transaction still failed)
17:39:47 - Training completion attempt
❌ Cannot update training logs
❌ Overall status reported as FAILED
✅ Event published to RabbitMQ (training.failed)
```
---
### 5. Root Cause Analysis
**File:** `services/training/app/ml/prophet_manager.py`
**Method:** `_store_model()` (lines 697-832)
**Issue:** Double commit logic error
**The Bug Pattern:**
```python
# Line 751: Set flag based on parent session
use_parent_session = session is not None # False when session=None
# Inner function at lines 753-811
async def _store_in_db(db_session):
# ... database operations ...
db_session.add(db_model)
# Line 807-808: Commit if NOT using parent session
if not use_parent_session:
await db_session.commit() # ❌ FIRST COMMIT
# Lines 813-825: Outer try block
try:
if use_parent_session:
await _store_in_db(session) # ✅ No commit (parent handles it)
else:
# Line 819-825: Create new session
async with self.database_manager.get_session() as new_session:
await _store_in_db(new_session) # ❌ Commits at line 808
await new_session.commit() # ❌ SECOND COMMIT - ERROR!
```
**Why This Fails:**
1. SQLAlchemy's async session starts commit with `_prepare_impl()`
2. First commit (line 808) begins this process
3. Second commit (line 824) tries to commit while prepare is in progress
4. Session state machine rejects this: transition to CLOSED would be unexpected
5. Transaction marked as failed, all subsequent operations rejected
**Historical Context:**
Recent commit history shows ongoing session management issues:
```
673108e - Fix deadlock issues in training 2
74215d3 - Fix deadlock issues in training
fd0a96e - Fix remaining nested session issues in training pipeline
b2de56e - Fix additional nested session deadlock in data_processor.py
e585e9f - Fix critical nested session deadlock in training_service.py
```
This double-commit bug likely introduced during these "fixes" as a regression.
---
### 6. Recommended Solution
**Option 1: Remove Inner Commit (Recommended)**
Remove the commit logic from `_store_in_db()` entirely:
```python
# DELETE lines 805-811
# if not use_parent_session:
# await db_session.commit()
# logger.info(f"Model {model_id} stored in database successfully")
# else:
# logger.debug(f"Added model {model_id} to parent session (commit deferred)")
```
All commits should be handled by the outer scope (either parent session or new session block).
**Option 2: Pass Commit Control Flag**
```python
async def _store_in_db(db_session, should_commit=False):
"""Inner function to store model in database"""
# ... existing code ...
db_session.add(db_model)
if should_commit:
await db_session.commit()
logger.info(f"Model {model_id} stored successfully")
# Then call it:
if use_parent_session:
await _store_in_db(session, should_commit=False) # Parent commits
else:
async with self.database_manager.get_session() as new_session:
await _store_in_db(new_session, should_commit=False) # We commit here
await new_session.commit()
```
**Recommendation:** Option 1 is simpler and follows the "single responsibility principle" - the inner function should only perform the database operations, not manage transactions.
---
### 7. Testing Recommendations
After fix implementation:
1. **Unit Tests:**
- Test `_store_model()` with `session=None` (new session path)
- Test `_store_model()` with provided session (parent session path)
- Verify only one commit occurs in each path
2. **Integration Tests:**
- Run parallel training with 5 products
- Verify all database records created
- Verify no transaction conflicts
- Check training logs updated correctly
3. **Monitoring:**
- Watch for `InFailedSQLTransactionError` in logs
- Monitor training success rate
- Verify model records appear in `trained_models` table
---
## Action Items
- [ ] Fix double commit bug in `prophet_manager.py:805-811`
- [ ] Test fix with parallel training
- [ ] Add unit tests for session management
- [ ] Review all recent session management changes for similar patterns
- [ ] Fix Spanish holidays loading warning (optional)
- [ ] Document session management patterns for future development
---
## Additional Notes
**Good News:**
- ✅ All models trained successfully with good accuracy
- ✅ Model files saved to disk and can be recovered
- ✅ Parallel training infrastructure works correctly
- ✅ Prophet optimization producing quality models (MAPE 11.96%-14.16%)
**Bad News:**
- ❌ Database integration broken due to double commit
- ❌ Models cannot be used without DB records
- ❌ Training pipeline reports as failed despite successful training
- ❌ Recent "fixes" introduced regression
**Impact Assessment:**
- **Severity:** HIGH - Training pipeline completely broken
- **User Impact:** Cannot train new models
- **Data Loss:** None - models saved to disk
- **Recovery:** Fix code + re-run training OR manually insert DB records
---
**Analysis Date:** 2025-11-05
**Analyzed By:** Claude Code Agent
**Log Source:** Training service logs 17:39:45 - 17:39:47

View File

@@ -287,7 +287,7 @@ async def generate_batch_forecast(
from app.schemas.forecasts import BatchForecastResponse
now = datetime.now(timezone.utc)
return BatchForecastResponse(
id=batch_result.get('batch_id', str(uuid.uuid4())),
id=batch_result.get('id', str(uuid.uuid4())), # Use 'id' field (UUID) instead of 'batch_id' (string)
tenant_id=tenant_id,
batch_name=updated_request.batch_name,
status="completed",

View File

@@ -5,6 +5,7 @@ Main forecasting service that uses the repository pattern for data access
import structlog
import uuid
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, date, timedelta, timezone
from sqlalchemy.ext.asyncio import AsyncSession
@@ -63,8 +64,10 @@ class EnhancedForecastingService:
"""Generate batch forecasts using repository pattern"""
try:
# Implementation would use repository pattern to generate multiple forecasts
batch_uuid = uuid.uuid4()
return {
"batch_id": f"batch_{tenant_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"id": str(batch_uuid), # UUID for database references
"batch_id": f"batch_{tenant_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", # Human-readable batch identifier
"tenant_id": tenant_id,
"forecasts": [],
"total_forecasts": 0,
@@ -368,7 +371,7 @@ class EnhancedForecastingService:
forecast = await repos['forecast'].create_forecast(forecast_data)
# Step 7: Cache the prediction
# Step 6: Cache the prediction
await repos['cache'].cache_prediction(
tenant_id=tenant_id,
inventory_product_id=request.inventory_product_id,
@@ -521,51 +524,62 @@ class EnhancedForecastingService:
Generate forecast using a pre-fetched weather map to avoid multiple API calls.
"""
start_time = datetime.now(timezone.utc)
try:
logger.info("Generating enhanced forecast with weather map",
tenant_id=tenant_id,
inventory_product_id=request.inventory_product_id,
date=request.forecast_date.isoformat())
# Get session and initialize repositories
# CRITICAL FIX: Get model BEFORE opening database session to prevent session blocking during HTTP calls
# This prevents holding database connections during potentially slow external API calls
logger.debug("Fetching model data before opening database session",
tenant_id=tenant_id,
inventory_product_id=request.inventory_product_id)
model_data = await self._get_latest_model_with_fallback(tenant_id, request.inventory_product_id)
if not model_data:
raise ValueError(f"No valid model available for product: {request.inventory_product_id}")
logger.debug("Model data fetched successfully",
tenant_id=tenant_id,
model_id=model_data.get('model_id'))
# Prepare features (this doesn't make external HTTP calls when using weather_map)
features = await self._prepare_forecast_features_with_fallbacks_and_weather_map(tenant_id, request, weather_map)
# Now open database session AFTER external HTTP calls are complete
async with self.database_manager.get_background_session() as session:
repos = await self._init_repositories(session)
# Step 1: Check cache first
cached_prediction = await repos['cache'].get_cached_prediction(
tenant_id, request.inventory_product_id, request.location, request.forecast_date
)
if cached_prediction:
logger.debug("Using cached prediction",
tenant_id=tenant_id,
inventory_product_id=request.inventory_product_id)
return self._create_forecast_response_from_cache(cached_prediction)
# Step 2: Get model with validation
model_data = await self._get_latest_model_with_fallback(tenant_id, request.inventory_product_id)
if not model_data:
raise ValueError(f"No valid model available for product: {request.inventory_product_id}")
# Step 3: Prepare features with fallbacks, using the weather map
features = await self._prepare_forecast_features_with_fallbacks_and_weather_map(tenant_id, request, weather_map)
# Step 4: Generate prediction
# Step 2: Model data already fetched above (before session opened)
# Step 3: Generate prediction
prediction_result = await self.prediction_service.predict(
model_id=model_data['model_id'],
model_path=model_data['model_path'],
features=features,
confidence_level=request.confidence_level
)
# Step 5: Apply business rules
# Step 4: Apply business rules
adjusted_prediction = self._apply_business_rules(
prediction_result, request, features
)
# Step 6: Save forecast using repository
# Step 5: Save forecast using repository
# Convert forecast_date to datetime if it's a string
forecast_datetime = request.forecast_date
if isinstance(forecast_datetime, str):
@@ -599,7 +613,7 @@ class EnhancedForecastingService:
forecast = await repos['forecast'].create_forecast(forecast_data)
# Step 7: Cache the prediction
# Step 6: Cache the prediction
await repos['cache'].cache_prediction(
tenant_id=tenant_id,
inventory_product_id=request.inventory_product_id,
@@ -813,32 +827,51 @@ class EnhancedForecastingService:
# Additional helper methods from original service
async def _get_latest_model_with_fallback(self, tenant_id: str, inventory_product_id: str) -> Optional[Dict[str, Any]]:
"""Get the latest trained model with fallback strategies"""
"""
Get the latest trained model with fallback strategies.
CRITICAL FIX: Added timeout protection to prevent hanging during external API calls.
This ensures we don't block indefinitely if the training service is unresponsive.
"""
try:
model_data = await self.model_client.get_best_model_for_forecasting(
tenant_id=tenant_id,
inventory_product_id=inventory_product_id
# Add timeout protection (15 seconds) to prevent hanging
# This is shorter than the default 30s to fail fast and avoid blocking
model_data = await asyncio.wait_for(
self.model_client.get_best_model_for_forecasting(
tenant_id=tenant_id,
inventory_product_id=inventory_product_id
),
timeout=15.0
)
if model_data:
logger.info("Found specific model for product",
inventory_product_id=inventory_product_id,
model_id=model_data.get('model_id'))
return model_data
# Fallback: Try to get any model for this tenant
fallback_model = await self.model_client.get_any_model_for_tenant(tenant_id)
# Fallback: Try to get any model for this tenant (also with timeout)
fallback_model = await asyncio.wait_for(
self.model_client.get_any_model_for_tenant(tenant_id),
timeout=15.0
)
if fallback_model:
logger.info("Using fallback model",
model_id=fallback_model.get('model_id'))
return fallback_model
logger.error("No models available for tenant", tenant_id=tenant_id)
return None
except asyncio.TimeoutError:
logger.error("Timeout fetching model data from training service",
tenant_id=tenant_id,
inventory_product_id=inventory_product_id,
timeout_seconds=15)
return None
except Exception as e:
logger.error("Error getting model", error=str(e))
logger.error("Error getting model", error=str(e), tenant_id=tenant_id)
return None
async def _prepare_forecast_features_with_fallbacks(
@@ -857,6 +890,9 @@ class EnhancedForecastingService:
"week_of_year": request.forecast_date.isocalendar().week,
"season": self._get_season(request.forecast_date.month),
"is_holiday": self._is_spanish_holiday(request.forecast_date),
# CRITICAL FIX: Add tenant_id and inventory_product_id for historical feature enrichment
"tenant_id": tenant_id,
"inventory_product_id": request.inventory_product_id,
}
# Fetch REAL weather data from external service
@@ -951,6 +987,9 @@ class EnhancedForecastingService:
"week_of_year": request.forecast_date.isocalendar().week,
"season": self._get_season(request.forecast_date.month),
"is_holiday": self._is_spanish_holiday(request.forecast_date),
# CRITICAL FIX: Add tenant_id and inventory_product_id for historical feature enrichment
"tenant_id": tenant_id,
"inventory_product_id": request.inventory_product_id,
}
# Use the pre-fetched weather data from the weather map to avoid additional API calls

View File

@@ -20,6 +20,7 @@ import joblib
from app.core.config import settings
from shared.monitoring.metrics import MetricsCollector
from shared.database.base import create_database_manager
from shared.clients import get_sales_client
logger = structlog.get_logger()
metrics = MetricsCollector("forecasting-service")
@@ -34,6 +35,8 @@ class PredictionService:
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "forecasting-service")
self.model_cache = {}
self.cache_ttl = 3600 # 1 hour cache
# Initialize sales client for fetching historical data
self.sales_client = get_sales_client(settings, "forecasting")
async def validate_prediction_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Validate prediction request"""
@@ -79,7 +82,34 @@ class PredictionService:
if not model:
raise ValueError(f"Model {model_id} not found or failed to load")
# CRITICAL FIX: Fetch historical sales data and calculate historical features
# This populates lag, rolling, and trend features for better predictions
# Using 90 days for better trend analysis and more robust rolling statistics
if 'tenant_id' in features and 'inventory_product_id' in features and 'date' in features:
try:
forecast_date = pd.to_datetime(features['date'])
historical_sales = await self._fetch_historical_sales(
tenant_id=features['tenant_id'],
inventory_product_id=features['inventory_product_id'],
forecast_date=forecast_date,
days_back=90 # Changed from 30 to 90 for better historical context
)
# Calculate historical features and merge into features dict
historical_features = self._calculate_historical_features(
historical_sales, forecast_date
)
features.update(historical_features)
logger.info("Historical features enriched",
lag_1_day=historical_features.get('lag_1_day'),
rolling_mean_7d=historical_features.get('rolling_mean_7d'))
except Exception as e:
logger.warning("Failed to enrich with historical features, using defaults",
error=str(e))
# Features dict will use defaults (0.0) from _prepare_prophet_features
# Prepare features for Prophet model
prophet_df = self._prepare_prophet_features(features)
@@ -444,7 +474,222 @@ class PredictionService:
except Exception as e:
logger.error(f"Model validation error: {e}")
return False
async def _fetch_historical_sales(
self,
tenant_id: str,
inventory_product_id: str,
forecast_date: datetime,
days_back: int = 90
) -> pd.Series:
"""
Fetch historical sales data for calculating lagged and rolling features.
Args:
tenant_id: Tenant UUID
inventory_product_id: Product UUID
forecast_date: The date we're forecasting for
days_back: Number of days of history to fetch (default 90 for better trend analysis)
Returns:
pandas Series with sales quantities indexed by date
"""
try:
# Calculate date range
end_date = forecast_date - pd.Timedelta(days=1) # Day before forecast
start_date = end_date - pd.Timedelta(days=days_back)
logger.debug("Fetching historical sales for feature calculation",
tenant_id=tenant_id,
product_id=inventory_product_id,
start_date=start_date.date(),
end_date=end_date.date(),
days_back=days_back)
# Fetch sales data from sales service
sales_data = await self.sales_client.get_sales_data(
tenant_id=tenant_id,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
product_id=inventory_product_id,
aggregation="daily"
)
if not sales_data:
logger.warning("No historical sales data found",
tenant_id=tenant_id,
product_id=inventory_product_id)
return pd.Series(dtype=float)
# Convert to pandas Series indexed by date
df = pd.DataFrame(sales_data)
df['sale_date'] = pd.to_datetime(df['sale_date'])
df = df.set_index('sale_date')
# Extract quantity column (could be 'quantity' or 'total_quantity')
if 'quantity' in df.columns:
series = df['quantity']
elif 'total_quantity' in df.columns:
series = df['total_quantity']
else:
logger.warning("Sales data missing quantity field",
columns=list(df.columns))
return pd.Series(dtype=float)
logger.debug("Historical sales fetched successfully",
records=len(series),
date_range=f"{series.index.min()} to {series.index.max()}")
return series.sort_index()
except Exception as e:
logger.error("Error fetching historical sales",
error=str(e),
tenant_id=tenant_id,
product_id=inventory_product_id)
return pd.Series(dtype=float)
def _calculate_historical_features(
self,
historical_sales: pd.Series,
forecast_date: datetime
) -> Dict[str, float]:
"""
Calculate lagged, rolling, and trend features from historical sales data.
Args:
historical_sales: Series of sales quantities indexed by date
forecast_date: The date we're forecasting for
Returns:
Dictionary of calculated features
"""
features = {}
try:
if len(historical_sales) == 0:
logger.warning("No historical data available, using default values")
# Return all features with default values (0.0)
return {
# Lagged features
'lag_1_day': 0.0,
'lag_7_day': 0.0,
'lag_14_day': 0.0,
# Rolling statistics (7-day window)
'rolling_mean_7d': 0.0,
'rolling_std_7d': 0.0,
'rolling_max_7d': 0.0,
'rolling_min_7d': 0.0,
# Rolling statistics (14-day window)
'rolling_mean_14d': 0.0,
'rolling_std_14d': 0.0,
'rolling_max_14d': 0.0,
'rolling_min_14d': 0.0,
# Rolling statistics (30-day window)
'rolling_mean_30d': 0.0,
'rolling_std_30d': 0.0,
'rolling_max_30d': 0.0,
'rolling_min_30d': 0.0,
# Trend features
'days_since_start': 0,
'momentum_1_7': 0.0,
'trend_7_30': 0.0,
'velocity_week': 0.0,
}
# Calculate lagged features
features['lag_1_day'] = float(historical_sales.iloc[-1]) if len(historical_sales) >= 1 else 0.0
features['lag_7_day'] = float(historical_sales.iloc[-7]) if len(historical_sales) >= 7 else features['lag_1_day']
features['lag_14_day'] = float(historical_sales.iloc[-14]) if len(historical_sales) >= 14 else features['lag_7_day']
# Calculate rolling statistics (7-day window)
if len(historical_sales) >= 7:
window_7d = historical_sales.iloc[-7:]
features['rolling_mean_7d'] = float(window_7d.mean())
features['rolling_std_7d'] = float(window_7d.std())
features['rolling_max_7d'] = float(window_7d.max())
features['rolling_min_7d'] = float(window_7d.min())
else:
features['rolling_mean_7d'] = features['lag_1_day']
features['rolling_std_7d'] = 0.0
features['rolling_max_7d'] = features['lag_1_day']
features['rolling_min_7d'] = features['lag_1_day']
# Calculate rolling statistics (14-day window)
if len(historical_sales) >= 14:
window_14d = historical_sales.iloc[-14:]
features['rolling_mean_14d'] = float(window_14d.mean())
features['rolling_std_14d'] = float(window_14d.std())
features['rolling_max_14d'] = float(window_14d.max())
features['rolling_min_14d'] = float(window_14d.min())
else:
features['rolling_mean_14d'] = features['rolling_mean_7d']
features['rolling_std_14d'] = features['rolling_std_7d']
features['rolling_max_14d'] = features['rolling_max_7d']
features['rolling_min_14d'] = features['rolling_min_7d']
# Calculate rolling statistics (30-day window)
if len(historical_sales) >= 30:
window_30d = historical_sales.iloc[-30:]
features['rolling_mean_30d'] = float(window_30d.mean())
features['rolling_std_30d'] = float(window_30d.std())
features['rolling_max_30d'] = float(window_30d.max())
features['rolling_min_30d'] = float(window_30d.min())
else:
features['rolling_mean_30d'] = features['rolling_mean_14d']
features['rolling_std_30d'] = features['rolling_std_14d']
features['rolling_max_30d'] = features['rolling_max_14d']
features['rolling_min_30d'] = features['rolling_min_14d']
# Calculate trend features
if len(historical_sales) > 0:
# Days since first sale
features['days_since_start'] = (forecast_date - historical_sales.index[0]).days
# Momentum (difference between recent lag_1_day and lag_7_day)
if len(historical_sales) >= 7:
features['momentum_1_7'] = features['lag_1_day'] - features['lag_7_day']
else:
features['momentum_1_7'] = 0.0
# Trend (difference between recent 7-day and 30-day averages)
if len(historical_sales) >= 30:
features['trend_7_30'] = features['rolling_mean_7d'] - features['rolling_mean_30d']
else:
features['trend_7_30'] = 0.0
# Velocity (rate of change over the last week)
if len(historical_sales) >= 7:
week_change = historical_sales.iloc[-1] - historical_sales.iloc[-7]
features['velocity_week'] = float(week_change / 7.0)
else:
features['velocity_week'] = 0.0
else:
features['days_since_start'] = 0
features['momentum_1_7'] = 0.0
features['trend_7_30'] = 0.0
features['velocity_week'] = 0.0
logger.debug("Historical features calculated",
lag_1_day=features['lag_1_day'],
rolling_mean_7d=features['rolling_mean_7d'],
rolling_mean_30d=features['rolling_mean_30d'],
momentum=features['momentum_1_7'])
return features
except Exception as e:
logger.error("Error calculating historical features",
error=str(e))
# Return default values on error
return {k: 0.0 for k in [
'lag_1_day', 'lag_7_day', 'lag_14_day',
'rolling_mean_7d', 'rolling_std_7d', 'rolling_max_7d', 'rolling_min_7d',
'rolling_mean_14d', 'rolling_std_14d', 'rolling_max_14d', 'rolling_min_14d',
'rolling_mean_30d', 'rolling_std_30d', 'rolling_max_30d', 'rolling_min_30d',
'momentum_1_7', 'trend_7_30', 'velocity_week'
]} | {'days_since_start': 0}
def _prepare_prophet_features(self, features: Dict[str, Any]) -> pd.DataFrame:
"""Convert features to Prophet-compatible DataFrame - COMPLETE FEATURE MATCHING"""
@@ -539,6 +784,9 @@ class PredictionService:
'is_month_start': int(forecast_date.day <= 3),
'is_month_end': int(forecast_date.day >= 28),
'is_payday_period': int((forecast_date.day <= 5) or (forecast_date.day >= 25)),
# CRITICAL FIX: Add is_payday feature to match training service
# Training defines: is_payday = (day == 15 OR is_month_end)
'is_payday': int((forecast_date.day == 15) or self._is_end_of_month(forecast_date)),
# Weather-based derived features
'temp_squared': temperature ** 2,
@@ -600,23 +848,57 @@ class PredictionService:
# Day features
'is_peak_bakery_day': int(day_of_week in [4, 5, 6]),
'is_high_demand_month': int(forecast_date.month in [6, 7, 8, 12]),
'is_warm_season': int(forecast_date.month in [4, 5, 6, 7, 8, 9])
'is_warm_season': int(forecast_date.month in [4, 5, 6, 7, 8, 9]),
# CRITICAL FIX: Cyclical encoding features (MATCH TRAINING)
# These encode day_of_week and month as sin/cos for cyclical patterns
'day_of_week_sin': float(np.sin(2 * np.pi * day_of_week / 7)),
'day_of_week_cos': float(np.cos(2 * np.pi * day_of_week / 7)),
'month_sin': float(np.sin(2 * np.pi * forecast_date.month / 12)),
'month_cos': float(np.cos(2 * np.pi * forecast_date.month / 12)),
# CRITICAL FIX: Historical features (lagged, rolling, trend)
# These will be populated from historical sales data
# Default to 0.0 here, will be updated if historical data is provided
'lag_1_day': float(features.get('lag_1_day', 0.0)),
'lag_7_day': float(features.get('lag_7_day', 0.0)),
'lag_14_day': float(features.get('lag_14_day', 0.0)),
'rolling_mean_7d': float(features.get('rolling_mean_7d', 0.0)),
'rolling_std_7d': float(features.get('rolling_std_7d', 0.0)),
'rolling_max_7d': float(features.get('rolling_max_7d', 0.0)),
'rolling_min_7d': float(features.get('rolling_min_7d', 0.0)),
'rolling_mean_14d': float(features.get('rolling_mean_14d', 0.0)),
'rolling_std_14d': float(features.get('rolling_std_14d', 0.0)),
'rolling_max_14d': float(features.get('rolling_max_14d', 0.0)),
'rolling_min_14d': float(features.get('rolling_min_14d', 0.0)),
'rolling_mean_30d': float(features.get('rolling_mean_30d', 0.0)),
'rolling_std_30d': float(features.get('rolling_std_30d', 0.0)),
'rolling_max_30d': float(features.get('rolling_max_30d', 0.0)),
'rolling_min_30d': float(features.get('rolling_min_30d', 0.0)),
'days_since_start': int(features.get('days_since_start', 0)),
'momentum_1_7': float(features.get('momentum_1_7', 0.0)),
'trend_7_30': float(features.get('trend_7_30', 0.0)),
'velocity_week': float(features.get('velocity_week', 0.0)),
}
# Calculate interaction features
is_holiday = new_features['is_holiday']
is_pleasant = new_features['is_pleasant_day']
is_rainy = new_features['is_rainy_day']
is_payday = new_features['is_payday']
interaction_features = {
# Weekend interactions
'weekend_temp_interaction': is_weekend * temperature,
'weekend_pleasant_weather': is_weekend * is_pleasant,
'weekend_traffic_interaction': is_weekend * traffic,
# Holiday interactions
'holiday_temp_interaction': is_holiday * temperature,
'holiday_traffic_interaction': is_holiday * traffic,
# CRITICAL FIX: Add payday_weekend_interaction to match training service
'payday_weekend_interaction': is_payday * is_weekend,
# Season interactions
'season_temp_interaction': season * temperature,
@@ -625,7 +907,11 @@ class PredictionService:
# Rain-traffic interactions
'rain_traffic_interaction': is_rainy * traffic,
'rain_speed_interaction': is_rainy * avg_speed,
# CRITICAL FIX: Add missing interaction features from training
'rain_weekend_interaction': is_rainy * is_weekend,
'friday_traffic_interaction': int(day_of_week == 4) * traffic,
# Day-weather interactions
'day_temp_interaction': day_of_week * temperature,
'month_temp_interaction': forecast_date.month * temperature,
@@ -707,4 +993,14 @@ class PredictionService:
elif precipitation <= 10:
return 2 # Moderate rain
else:
return 3 # Heavy rain
return 3 # Heavy rain
def _is_end_of_month(self, date: datetime) -> bool:
"""
Check if date is the last day of the month - MATCH TRAINING SERVICE
Training uses: df[date_column].dt.is_month_end
"""
import calendar
# Get the last day of the month
last_day = calendar.monthrange(date.year, date.month)[1]
return date.day == last_day

View File

@@ -34,7 +34,8 @@ route_builder = RouteBuilder('notifications')
# Dependency injection for enhanced notification service
def get_enhanced_notification_service():
database_manager = create_database_manager()
from app.core.config import settings
database_manager = create_database_manager(settings.DATABASE_URL, "notification")
return EnhancedNotificationService(database_manager)
@@ -47,7 +48,6 @@ def get_enhanced_notification_service():
response_model=NotificationResponse,
status_code=201
)
@require_user_role(["member", "admin", "owner"])
@track_endpoint_metrics("notification_send")
async def send_notification(
notification_data: Dict[str, Any],
@@ -55,11 +55,23 @@ async def send_notification(
current_user: Dict[str, Any] = Depends(get_current_user_dep),
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
):
"""Send a single notification with enhanced validation and features"""
"""Send a single notification with enhanced validation and features - allows service-to-service calls"""
try:
# Allow service-to-service calls (skip role check for service tokens)
is_service_call = current_user.get("type") == "service"
if not is_service_call:
# Check user role for non-service calls
user_role = current_user.get("role", "").lower()
if user_role not in ["member", "admin", "owner"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
# Check permissions for broadcast notifications (Admin+ only)
if notification_data.get("broadcast", False):
if notification_data.get("broadcast", False) and not is_service_call:
user_role = current_user.get("role", "").lower()
if user_role not in ["admin", "owner"]:
raise HTTPException(
@@ -111,10 +123,14 @@ async def send_notification(
detail=f"Invalid priority: {notification_data['priority']}"
)
# Use tenant_id from path parameter (especially for service calls)
effective_tenant_id = str(tenant_id) if is_service_call else current_user.get("tenant_id")
effective_sender_id = current_user.get("user_id", "system")
# Create notification using enhanced service
notification = await notification_service.create_notification(
tenant_id=current_user.get("tenant_id"),
sender_id=current_user["user_id"],
tenant_id=effective_tenant_id,
sender_id=effective_sender_id,
notification_type=notification_type,
message=notification_data["message"],
recipient_id=notification_data.get("recipient_id"),
@@ -131,18 +147,20 @@ async def send_notification(
logger.info("Notification sent successfully",
notification_id=notification.id,
tenant_id=current_user.get("tenant_id"),
tenant_id=effective_tenant_id,
type=notification_type.value,
priority=priority.value)
priority=priority.value,
is_service_call=is_service_call)
return NotificationResponse.from_orm(notification)
except HTTPException:
raise
except Exception as e:
effective_tenant_id = str(tenant_id) if current_user.get("type") == "service" else current_user.get("tenant_id")
logger.error("Failed to send notification",
tenant_id=current_user.get("tenant_id"),
sender_id=current_user["user_id"],
tenant_id=effective_tenant_id,
sender_id=current_user.get("user_id", "system"),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,

View File

@@ -25,7 +25,8 @@ route_builder = RouteBuilder('notifications')
# Dependency injection for enhanced notification service
def get_enhanced_notification_service():
database_manager = create_database_manager()
from app.core.config import settings
database_manager = create_database_manager(settings.DATABASE_URL, "notification")
return EnhancedNotificationService(database_manager)
# ============================================================================

View File

@@ -46,7 +46,6 @@ class EnhancedNotificationService:
'log': self.log_repo
}
@transactional
async def create_notification(
self,
tenant_id: str,
@@ -70,11 +69,11 @@ class EnhancedNotificationService:
try:
async with self.database_manager.get_session() as db_session:
async with UnitOfWork(db_session) as uow:
# Register repositories
notification_repo = uow.register_repository("notifications", NotificationRepository)
template_repo = uow.register_repository("templates", TemplateRepository)
preference_repo = uow.register_repository("preferences", PreferenceRepository)
log_repo = uow.register_repository("logs", LogRepository)
# Register repositories with model classes
notification_repo = uow.register_repository("notifications", NotificationRepository, Notification)
template_repo = uow.register_repository("templates", TemplateRepository, NotificationTemplate)
preference_repo = uow.register_repository("preferences", PreferenceRepository, NotificationPreference)
log_repo = uow.register_repository("logs", LogRepository, NotificationLog)
notification_data = {
"tenant_id": tenant_id,

View File

@@ -105,145 +105,7 @@ class EnhancedBakeryMLTrainer:
return await self._execute_training_pipeline(
tenant_id, training_dataset, job_id, session
)
estimated_completion_time = calculate_estimated_completion_time(estimated_duration_minutes)
# Note: Initial event was already published by API endpoint with estimated product count,
# this updates with real count and recalculated time estimates based on actual data
await publish_training_started(
job_id=job_id,
tenant_id=tenant_id,
total_products=len(products),
estimated_duration_minutes=estimated_duration_minutes,
estimated_completion_time=estimated_completion_time.isoformat()
)
# Create initial training log entry
await repos['training_log'].update_log_progress(
job_id, 5, "data_processing", "running"
)
# ✅ FIX: Flush the session to ensure the update is committed before proceeding
# This prevents deadlocks when training methods need to acquire locks
await db_session.flush()
logger.debug("Flushed session after initial progress update")
# Process data for each product using enhanced processor
logger.info("Processing data using enhanced processor")
processed_data = await self._process_all_products_enhanced(
sales_df, weather_df, traffic_df, products, tenant_id, job_id, session
)
# Categorize all products for category-specific forecasting
logger.info("Categorizing products for optimized forecasting")
product_categories = await self._categorize_all_products(
sales_df, processed_data
)
logger.info("Product categorization complete",
total_products=len(product_categories),
categories_breakdown={cat.value: sum(1 for c in product_categories.values() if c == cat)
for cat in set(product_categories.values())})
# Event 2: Data Analysis (20%)
# Recalculate time remaining based on elapsed time
start_time = await repos['training_log'].get_start_time(job_id)
elapsed_seconds = 0
if start_time:
elapsed_seconds = int((datetime.now(timezone.utc) - start_time).total_seconds())
# Estimate remaining time: we've done ~20% of work (data analysis)
# Remaining 80% includes training all products
products_to_train = len(processed_data)
estimated_remaining_seconds = int(products_to_train * avg_time_per_product)
# Recalculate estimated completion time
estimated_completion_time_data_analysis = calculate_estimated_completion_time(
estimated_remaining_seconds / 60
)
await publish_data_analysis(
job_id,
tenant_id,
f"Data analysis completed for {len(processed_data)} products",
estimated_time_remaining_seconds=estimated_remaining_seconds,
estimated_completion_time=estimated_completion_time_data_analysis.isoformat()
)
# Train models for each processed product with progress aggregation
logger.info("Training models with repository integration and progress aggregation")
# Create progress tracker for parallel product training (20-80%)
progress_tracker = ParallelProductProgressTracker(
job_id=job_id,
tenant_id=tenant_id,
total_products=len(processed_data)
)
# Train all models in parallel (without DB writes to avoid session conflicts)
# ✅ FIX: Pass db_session to prevent nested session issues and deadlocks
training_results = await self._train_all_models_enhanced(
tenant_id, processed_data, job_id, repos, progress_tracker, product_categories, db_session
)
# Write all training results to database sequentially (after parallel training completes)
logger.info("Writing training results to database sequentially")
training_results = await self._write_training_results_to_database(
tenant_id, job_id, training_results, repos
)
# Calculate overall training summary with enhanced metrics
summary = await self._calculate_enhanced_training_summary(
training_results, repos, tenant_id
)
# Calculate successful and failed trainings
successful_trainings = len([r for r in training_results.values() if r.get('status') == 'success'])
failed_trainings = len([r for r in training_results.values() if r.get('status') == 'error'])
total_duration = sum([r.get('training_time_seconds', 0) for r in training_results.values()])
# Event 4: Training Completed (100%)
await publish_training_completed(
job_id,
tenant_id,
successful_trainings,
failed_trainings,
total_duration
)
# Create comprehensive result with repository data
result = {
"job_id": job_id,
"tenant_id": tenant_id,
"status": "completed",
"products_trained": len([r for r in training_results.values() if r.get('status') == 'success']),
"products_failed": len([r for r in training_results.values() if r.get('status') == 'error']),
"products_skipped": len([r for r in training_results.values() if r.get('status') == 'skipped']),
"total_products": len(products),
"training_results": training_results,
"enhanced_summary": summary,
"models_trained": summary.get('models_created', {}),
"data_info": {
"date_range": {
"start": training_dataset.date_range.start.isoformat(),
"end": training_dataset.date_range.end.isoformat(),
"duration_days": (training_dataset.date_range.end - training_dataset.date_range.start).days
},
"data_sources": [source.value for source in training_dataset.date_range.available_sources],
"constraints_applied": training_dataset.date_range.constraints
},
"repository_metadata": {
"total_records_created": summary.get('total_db_records', 0),
"performance_metrics_stored": summary.get('performance_metrics_created', 0),
"artifacts_created": summary.get('artifacts_created', 0)
},
"completed_at": datetime.now().isoformat()
}
logger.info("Enhanced ML training pipeline completed successfully",
job_id=job_id,
models_created=len([r for r in training_results.values() if r.get('status') == 'success']))
return result
except Exception as e:
logger.error("Enhanced ML training pipeline failed",
job_id=job_id,
@@ -408,6 +270,12 @@ class EnhancedBakeryMLTrainer:
tenant_id, job_id, training_results, repos
)
# ✅ CRITICAL FIX: Commit the session to persist model records to database
# Without this commit, all model records created above are lost when session closes
await session.commit()
logger.info("Committed model records to database",
models_created=len([r for r in training_results.values() if 'model_record_id' in r]))
# Calculate overall training summary with enhanced metrics
summary = await self._calculate_enhanced_training_summary(
training_results, repos, tenant_id

View File

@@ -407,9 +407,9 @@ class BaseServiceClient(ABC):
timeout=timeout
)
async def post(self, endpoint: str, data: Dict[str, Any], tenant_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Make a POST request"""
return await self._make_request("POST", endpoint, tenant_id=tenant_id, data=data)
async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, tenant_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Make a POST request with optional query parameters"""
return await self._make_request("POST", endpoint, tenant_id=tenant_id, data=data, params=params)
async def put(self, endpoint: str, data: Dict[str, Any], tenant_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Make a PUT request"""