Files
bakery-ia/services/forecasting/app/ml/predictor.py
2025-08-14 16:47:34 +02:00

336 lines
13 KiB
Python

# ================================================================
# services/forecasting/app/ml/predictor.py
# ================================================================
"""
Enhanced predictor module with advanced forecasting capabilities
"""
import structlog
from typing import Dict, List, Any, Optional, Tuple
import pandas as pd
import numpy as np
from datetime import datetime, date, timedelta
import pickle
import json
from app.core.config import settings
from shared.monitoring.metrics import MetricsCollector
from shared.database.base import create_database_manager
logger = structlog.get_logger()
metrics = MetricsCollector("forecasting-service")
class BakeryPredictor:
"""
Advanced predictor for bakery demand forecasting with dependency injection
Handles Prophet models and business-specific logic
"""
def __init__(self, database_manager=None):
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "forecasting-service")
self.model_cache = {}
self.business_rules = BakeryBusinessRules()
class BakeryForecaster:
"""
Enhanced forecaster that integrates with repository pattern
"""
def __init__(self, database_manager=None):
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "forecasting-service")
self.predictor = BakeryPredictor(database_manager)
async def generate_forecast_with_repository(self, tenant_id: str, inventory_product_id: str,
forecast_date: date, model_id: str = None) -> Dict[str, Any]:
"""Generate forecast with repository integration"""
try:
# This would integrate with repositories for model loading and caching
# Implementation would be added here
return {
"tenant_id": tenant_id,
"inventory_product_id": inventory_product_id,
"forecast_date": forecast_date.isoformat(),
"prediction": 0.0,
"confidence_interval": {"lower": 0.0, "upper": 0.0},
"status": "completed",
"repository_integration": True
}
except Exception as e:
logger.error("Forecast generation failed", error=str(e))
raise
async def predict_demand(self, model, features: Dict[str, Any],
business_type: str = "individual") -> Dict[str, float]:
"""Generate demand prediction with business rules applied"""
try:
# Generate base prediction
base_prediction = await self._generate_base_prediction(model, features)
# Apply business rules
adjusted_prediction = self.business_rules.apply_rules(
base_prediction, features, business_type
)
# Add uncertainty estimation
final_prediction = self._add_uncertainty_bands(adjusted_prediction, features)
return final_prediction
except Exception as e:
logger.error("Error in demand prediction", error=str(e))
raise
async def _generate_base_prediction(self, model, features: Dict[str, Any]) -> Dict[str, float]:
"""Generate base prediction from Prophet model"""
try:
# Convert features to Prophet DataFrame
df = self._prepare_prophet_dataframe(features)
# Generate forecast
forecast = model.predict(df)
if len(forecast) > 0:
row = forecast.iloc[0]
return {
"yhat": float(row['yhat']),
"yhat_lower": float(row['yhat_lower']),
"yhat_upper": float(row['yhat_upper']),
"trend": float(row.get('trend', 0)),
"seasonal": float(row.get('seasonal', 0)),
"weekly": float(row.get('weekly', 0)),
"yearly": float(row.get('yearly', 0)),
"holidays": float(row.get('holidays', 0))
}
else:
raise ValueError("No prediction generated from model")
except Exception as e:
logger.error("Error generating base prediction", error=str(e))
raise
def _prepare_prophet_dataframe(self, features: Dict[str, Any]) -> pd.DataFrame:
"""Convert features to Prophet-compatible DataFrame"""
try:
# Create base DataFrame
df = pd.DataFrame({
'ds': [pd.to_datetime(features['date'])]
})
# Add regressor features
feature_mapping = {
'temperature': 'temperature',
'precipitation': 'precipitation',
'humidity': 'humidity',
'wind_speed': 'wind_speed',
'traffic_volume': 'traffic_volume',
'pedestrian_count': 'pedestrian_count'
}
for feature_key, df_column in feature_mapping.items():
if feature_key in features and features[feature_key] is not None:
df[df_column] = float(features[feature_key])
else:
df[df_column] = 0.0
# Add categorical features
df['day_of_week'] = int(features.get('day_of_week', 0))
df['is_weekend'] = int(features.get('is_weekend', False))
df['is_holiday'] = int(features.get('is_holiday', False))
# Business type
business_type = features.get('business_type', 'individual')
df['is_central_workshop'] = int(business_type == 'central_workshop')
return df
except Exception as e:
logger.error("Error preparing Prophet dataframe", error=str(e))
raise
def _add_uncertainty_bands(self, prediction: Dict[str, float],
features: Dict[str, Any]) -> Dict[str, float]:
"""Add uncertainty estimation based on external factors"""
try:
base_demand = prediction["yhat"]
base_lower = prediction["yhat_lower"]
base_upper = prediction["yhat_upper"]
# Weather uncertainty
weather_uncertainty = self._calculate_weather_uncertainty(features)
# Holiday uncertainty
holiday_uncertainty = self._calculate_holiday_uncertainty(features)
# Weekend uncertainty
weekend_uncertainty = self._calculate_weekend_uncertainty(features)
# Total uncertainty factor
total_uncertainty = 1.0 + weather_uncertainty + holiday_uncertainty + weekend_uncertainty
# Adjust bounds
uncertainty_range = (base_upper - base_lower) * total_uncertainty
center_point = base_demand
adjusted_lower = center_point - (uncertainty_range / 2)
adjusted_upper = center_point + (uncertainty_range / 2)
return {
"demand": max(0, base_demand), # Never predict negative demand
"lower_bound": max(0, adjusted_lower),
"upper_bound": adjusted_upper,
"uncertainty_factor": total_uncertainty,
"trend": prediction.get("trend", 0),
"seasonal": prediction.get("seasonal", 0),
"holiday_effect": prediction.get("holidays", 0)
}
except Exception as e:
logger.error("Error adding uncertainty bands", error=str(e))
# Return basic prediction if uncertainty calculation fails
return {
"demand": max(0, prediction["yhat"]),
"lower_bound": max(0, prediction["yhat_lower"]),
"upper_bound": prediction["yhat_upper"],
"uncertainty_factor": 1.0
}
def _calculate_weather_uncertainty(self, features: Dict[str, Any]) -> float:
"""Calculate weather-based uncertainty"""
uncertainty = 0.0
# Temperature extremes add uncertainty
temp = features.get('temperature')
if temp is not None:
if temp < settings.TEMPERATURE_THRESHOLD_COLD or temp > settings.TEMPERATURE_THRESHOLD_HOT:
uncertainty += 0.1
# Rain adds uncertainty
precipitation = features.get('precipitation')
if precipitation is not None and precipitation > 0:
uncertainty += 0.05 * min(precipitation, 10) # Cap at 50mm
return uncertainty
def _calculate_holiday_uncertainty(self, features: Dict[str, Any]) -> float:
"""Calculate holiday-based uncertainty"""
if features.get('is_holiday', False):
return 0.2 # 20% additional uncertainty on holidays
return 0.0
def _calculate_weekend_uncertainty(self, features: Dict[str, Any]) -> float:
"""Calculate weekend-based uncertainty"""
if features.get('is_weekend', False):
return 0.1 # 10% additional uncertainty on weekends
return 0.0
class BakeryBusinessRules:
"""
Business rules for Spanish bakeries
Applies domain-specific adjustments to predictions
"""
def apply_rules(self, prediction: Dict[str, float], features: Dict[str, Any],
business_type: str) -> Dict[str, float]:
"""Apply all business rules to prediction"""
adjusted_prediction = prediction.copy()
# Apply weather rules
adjusted_prediction = self._apply_weather_rules(adjusted_prediction, features)
# Apply time-based rules
adjusted_prediction = self._apply_time_rules(adjusted_prediction, features)
# Apply business type rules
adjusted_prediction = self._apply_business_type_rules(adjusted_prediction, business_type)
# Apply Spanish-specific rules
adjusted_prediction = self._apply_spanish_rules(adjusted_prediction, features)
return adjusted_prediction
def _apply_weather_rules(self, prediction: Dict[str, float],
features: Dict[str, Any]) -> Dict[str, float]:
"""Apply weather-based business rules"""
# Rain reduces foot traffic
precipitation = features.get('precipitation', 0)
if precipitation > 0:
rain_factor = settings.RAIN_IMPACT_FACTOR
prediction["yhat"] *= rain_factor
prediction["yhat_lower"] *= rain_factor
prediction["yhat_upper"] *= rain_factor
# Extreme temperatures affect different products differently
temperature = features.get('temperature')
if temperature is not None:
if temperature > settings.TEMPERATURE_THRESHOLD_HOT:
# Hot weather reduces bread sales, increases cold drinks
prediction["yhat"] *= 0.9
elif temperature < settings.TEMPERATURE_THRESHOLD_COLD:
# Cold weather increases hot beverage sales
prediction["yhat"] *= 1.1
return prediction
def _apply_time_rules(self, prediction: Dict[str, float],
features: Dict[str, Any]) -> Dict[str, float]:
"""Apply time-based business rules"""
# Weekend adjustment
if features.get('is_weekend', False):
weekend_factor = settings.WEEKEND_ADJUSTMENT_FACTOR
prediction["yhat"] *= weekend_factor
prediction["yhat_lower"] *= weekend_factor
prediction["yhat_upper"] *= weekend_factor
# Holiday adjustment
if features.get('is_holiday', False):
holiday_factor = settings.HOLIDAY_ADJUSTMENT_FACTOR
prediction["yhat"] *= holiday_factor
prediction["yhat_lower"] *= holiday_factor
prediction["yhat_upper"] *= holiday_factor
return prediction
def _apply_business_type_rules(self, prediction: Dict[str, float],
business_type: str) -> Dict[str, float]:
"""Apply business type specific rules"""
if business_type == "central_workshop":
# Central workshops have more stable demand
uncertainty_reduction = 0.8
center = prediction["yhat"]
lower = prediction["yhat_lower"]
upper = prediction["yhat_upper"]
# Reduce uncertainty band
new_range = (upper - lower) * uncertainty_reduction
prediction["yhat_lower"] = center - (new_range / 2)
prediction["yhat_upper"] = center + (new_range / 2)
return prediction
def _apply_spanish_rules(self, prediction: Dict[str, float],
features: Dict[str, Any]) -> Dict[str, float]:
"""Apply Spanish bakery specific rules"""
# Spanish siesta time considerations
current_date = pd.to_datetime(features['date'])
day_of_week = current_date.weekday()
# Reduced activity during typical siesta hours (14:00-17:00)
# This affects afternoon sales planning
if day_of_week < 5: # Weekdays
prediction["yhat"] *= 0.95 # Slight reduction for siesta effect
return prediction