Files
bakery-ia/services/forecasting/app/services/prediction_service.py
2025-07-30 09:31:42 +02:00

466 lines
21 KiB
Python

# services/forecasting/app/services/prediction_service.py - FIXED SEASON FEATURE
"""
Prediction service for loading models and generating predictions
FIXED: Added missing 'season' feature that matches training service exactly
"""
import structlog
from typing import Dict, List, Any, Optional
import asyncio
import pickle
import json
from datetime import datetime, date
import numpy as np
import pandas as pd
import httpx
from pathlib import Path
import os
import joblib
from app.core.config import settings
from shared.monitoring.metrics import MetricsCollector
logger = structlog.get_logger()
metrics = MetricsCollector("forecasting-service")
class PredictionService:
"""
Service for loading ML models and generating predictions
Interfaces with trained Prophet models from the training service
"""
def __init__(self):
self.model_cache = {}
self.cache_ttl = 3600 # 1 hour cache
async def predict(self, model_id: str, model_path: str, features: Dict[str, Any],
confidence_level: float = 0.8) -> Dict[str, float]:
"""Generate prediction using trained model"""
start_time = datetime.now()
try:
logger.info("Generating prediction",
model_id=model_id,
features_count=len(features))
# Load model
model = await self._load_model(model_id, model_path)
if not model:
raise ValueError(f"Model {model_id} not found or failed to load")
# Prepare features for Prophet model
prophet_df = self._prepare_prophet_features(features)
# Generate prediction
forecast = model.predict(prophet_df)
# Extract prediction values
prediction_value = float(forecast['yhat'].iloc[0])
lower_bound = float(forecast['yhat_lower'].iloc[0])
upper_bound = float(forecast['yhat_upper'].iloc[0])
# Calculate confidence interval
confidence_interval = upper_bound - lower_bound
result = {
"prediction": max(0, prediction_value), # Ensure non-negative
"lower_bound": max(0, lower_bound),
"upper_bound": max(0, upper_bound),
"confidence_interval": confidence_interval,
"confidence_level": confidence_level
}
# Record metrics
processing_time = (datetime.now() - start_time).total_seconds()
# Record metrics with proper type conversion
try:
metrics.register_histogram("prediction_processing_time_seconds", float(processing_time))
metrics.increment_counter("predictions_served_total")
except Exception as metrics_error:
# Log metrics error but don't fail the prediction
logger.warning("Failed to record metrics", error=str(metrics_error))
logger.info("Prediction generated successfully",
model_id=model_id,
prediction=result["prediction"],
processing_time=processing_time)
return result
except Exception as e:
logger.error("Error generating prediction",
error=str(e),
model_id=model_id)
metrics.increment_counter("prediction_errors_total")
raise
async def _load_model(self, model_id: str, model_path: str):
"""Load model from file with improved validation and error handling"""
# Enhanced model file validation
if not await self._validate_model_file(model_path):
logger.error(f"Model file not valid: {model_path}")
return None
# Check cache first
if model_id in self.model_cache:
cached_model, cached_time = self.model_cache[model_id]
if (datetime.now() - cached_time).seconds < self.cache_ttl:
return cached_model
try:
if os.path.exists(model_path):
# Try multiple loading methods for compatibility
model = await self._load_model_safely(model_path)
if model is None:
logger.error(f"Failed to load model from: {model_path}")
return None
# Cache the model
self.model_cache[model_id] = (model, datetime.now())
logger.info(f"Model loaded successfully: {model_path}")
return model
else:
logger.error(f"Model file not found: {model_path}")
return None
except Exception as e:
logger.error(f"Error loading model: {e}")
return None
async def _load_model_safely(self, model_path: str):
"""Safely load model with multiple fallback methods"""
# Method 1: Try joblib first (recommended for sklearn/Prophet models)
try:
logger.debug(f"Attempting to load model with joblib: {model_path}")
model = joblib.load(model_path)
logger.info(f"Model loaded successfully with joblib")
return model
except Exception as e:
logger.warning(f"Joblib loading failed: {e}")
# Method 2: Try pickle as fallback
try:
logger.debug(f"Attempting to load model with pickle: {model_path}")
with open(model_path, 'rb') as f:
model = pickle.load(f)
logger.info(f"Model loaded successfully with pickle")
return model
except Exception as e:
logger.warning(f"Pickle loading failed: {e}")
# Method 3: Try pandas pickle (for Prophet models saved with pandas)
try:
logger.debug(f"Attempting to load model with pandas: {model_path}")
import pandas as pd
model = pd.read_pickle(model_path)
logger.info(f"Model loaded successfully with pandas")
return model
except Exception as e:
logger.warning(f"Pandas loading failed: {e}")
logger.error(f"All loading methods failed for: {model_path}")
return None
async def _validate_model_file(self, model_path: str) -> bool:
"""Enhanced model file validation"""
try:
if not os.path.exists(model_path):
logger.error(f"Model file not found: {model_path}")
return False
# Check file size (should be > 1KB for a trained model)
file_size = os.path.getsize(model_path)
if file_size < 1024:
logger.warning(f"Model file too small ({file_size} bytes): {model_path}")
return False
# More comprehensive file format detection
try:
with open(model_path, 'rb') as f:
header = f.read(16) # Read more bytes for better detection
# Check for various pickle/joblib signatures
valid_signatures = [
b']\x93PICKLE', # Joblib
b'\x80\x03', # Pickle protocol 3
b'\x80\x04', # Pickle protocol 4
b'\x80\x05', # Pickle protocol 5
b'}\x94', # Newer joblib format
b'}\x93', # Alternative joblib format
]
is_valid_format = any(header.startswith(sig) for sig in valid_signatures)
if not is_valid_format:
# Log header for debugging but don't fail validation
logger.warning(f"Unrecognized file header: {header[:8]} for {model_path}")
logger.info("Proceeding with loading attempt despite unrecognized header")
# Return True to allow loading attempt - some valid files may have different headers
return True
return True
except Exception as e:
logger.error(f"Error reading model file header: {e}")
return False
except Exception as e:
logger.error(f"Model validation error: {e}")
return False
def _prepare_prophet_features(self, features: Dict[str, Any]) -> pd.DataFrame:
"""Convert features to Prophet-compatible DataFrame - COMPLETE FEATURE MATCHING"""
try:
# Create base DataFrame with required 'ds' column
df = pd.DataFrame({
'ds': [pd.to_datetime(features['date'])]
})
# ✅ FIX: Add ALL traffic features that training service uses
# Core traffic features
df['traffic_volume'] = float(features.get('traffic_volume', 100.0))
df['pedestrian_count'] = float(features.get('pedestrian_count', 50.0))
df['congestion_level'] = float(features.get('congestion_level', 1.0))
df['average_speed'] = float(features.get('average_speed', 30.0)) # ← MISSING FEATURE!
# Weather features
df['temperature'] = float(features.get('temperature', 15.0))
df['precipitation'] = float(features.get('precipitation', 0.0))
df['humidity'] = float(features.get('humidity', 60.0))
df['wind_speed'] = float(features.get('wind_speed', 5.0))
df['pressure'] = float(features.get('pressure', 1013.0))
df['temp_category'] = self._get_temp_category(df['temperature'].iloc[0])
# Extract date information for temporal features
forecast_date = pd.to_datetime(features['date'])
day_of_week = forecast_date.weekday() # 0=Monday, 6=Sunday
# ✅ FIX: Add ALL temporal features (must match training exactly!)
df['day_of_week'] = int(day_of_week)
df['day_of_month'] = int(forecast_date.day)
df['month'] = int(forecast_date.month)
df['quarter'] = int(forecast_date.quarter)
df['week_of_year'] = int(forecast_date.isocalendar().week)
# ✅ FIX: Add the missing 'season' feature that matches training exactly
df['season'] = self._get_season(forecast_date.month)
# Bakery-specific temporal features
df['is_weekend'] = int(day_of_week >= 5)
df['is_monday'] = int(day_of_week == 0)
df['is_tuesday'] = int(day_of_week == 1)
df['is_wednesday'] = int(day_of_week == 2)
df['is_thursday'] = int(day_of_week == 3)
df['is_friday'] = int(day_of_week == 4)
df['is_saturday'] = int(day_of_week == 5)
df['is_sunday'] = int(day_of_week == 6)
df['is_working_day'] = int(day_of_week < 5) # Working days (Mon-Fri)
# Season-based features (match training service)
df['is_spring'] = int(df['season'].iloc[0] == 2)
df['is_summer'] = int(df['season'].iloc[0] == 3)
df['is_autumn'] = int(df['season'].iloc[0] == 4)
df['is_winter'] = int(df['season'].iloc[0] == 1)
# Holiday features
df['is_holiday'] = int(features.get('is_holiday', False))
df['is_school_holiday'] = int(features.get('is_school_holiday', False))
# Month-based features (match training)
df['is_january'] = int(forecast_date.month == 1)
df['is_february'] = int(forecast_date.month == 2)
df['is_march'] = int(forecast_date.month == 3)
df['is_april'] = int(forecast_date.month == 4)
df['is_may'] = int(forecast_date.month == 5)
df['is_june'] = int(forecast_date.month == 6)
df['is_july'] = int(forecast_date.month == 7)
df['is_august'] = int(forecast_date.month == 8)
df['is_september'] = int(forecast_date.month == 9)
df['is_october'] = int(forecast_date.month == 10)
df['is_november'] = int(forecast_date.month == 11)
df['is_december'] = int(forecast_date.month == 12)
# Special day features
df['is_month_start'] = int(forecast_date.day <= 3)
df['is_month_end'] = int(forecast_date.day >= 28)
df['is_payday_period'] = int((forecast_date.day <= 5) or (forecast_date.day >= 25))
# ✅ FIX: Add ALL derived features that training service creates
# Weather-based derived features
df['temp_squared'] = df['temperature'].iloc[0] ** 2
df['is_cold_day'] = int(df['temperature'].iloc[0] < 10)
df['is_hot_day'] = int(df['temperature'].iloc[0] > 25)
df['is_pleasant_day'] = int(10 <= df['temperature'].iloc[0] <= 25)
# Humidity features
df['humidity_squared'] = df['humidity'].iloc[0] ** 2
df['is_high_humidity'] = int(df['humidity'].iloc[0] > 70)
df['is_low_humidity'] = int(df['humidity'].iloc[0] < 40)
# Pressure features
df['pressure_squared'] = df['pressure'].iloc[0] ** 2
df['is_high_pressure'] = int(df['pressure'].iloc[0] > 1020)
df['is_low_pressure'] = int(df['pressure'].iloc[0] < 1000)
# Wind features
df['wind_squared'] = df['wind_speed'].iloc[0] ** 2
df['is_windy'] = int(df['wind_speed'].iloc[0] > 15)
df['is_calm'] = int(df['wind_speed'].iloc[0] < 5)
# Precipitation features
df['precip_squared'] = df['precipitation'].iloc[0] ** 2
df['precip_log'] = float(np.log1p(df['precipitation'].iloc[0]))
df['is_rainy_day'] = int(df['precipitation'].iloc[0] > 0.1)
df['is_very_rainy_day'] = int(df['precipitation'].iloc[0] > 5.0)
df['is_heavy_rain'] = int(df['precipitation'].iloc[0] > 10)
df['rain_intensity'] = self._get_rain_intensity(df['precipitation'].iloc[0])
# ✅ FIX: Add ALL traffic-based derived features
if df['traffic_volume'].iloc[0] > 0:
traffic = df['traffic_volume'].iloc[0]
df['high_traffic'] = int(traffic > 150)
df['low_traffic'] = int(traffic < 50)
df['traffic_normalized'] = float((traffic - 100) / 50)
df['traffic_squared'] = traffic ** 2
df['traffic_log'] = float(np.log1p(traffic))
else:
df['high_traffic'] = 0
df['low_traffic'] = 0
df['traffic_normalized'] = 0.0
df['traffic_squared'] = 0.0
df['traffic_log'] = 0.0
# ✅ FIX: Add pedestrian-based features
pedestrians = df['pedestrian_count'].iloc[0]
df['high_pedestrian_count'] = int(pedestrians > 100)
df['low_pedestrian_count'] = int(pedestrians < 25)
df['pedestrian_normalized'] = float((pedestrians - 50) / 25)
df['pedestrian_squared'] = pedestrians ** 2
df['pedestrian_log'] = float(np.log1p(pedestrians))
# ✅ FIX: Add average_speed-based features
avg_speed = df['average_speed'].iloc[0]
df['high_speed'] = int(avg_speed > 40)
df['low_speed'] = int(avg_speed < 20)
df['speed_normalized'] = float((avg_speed - 30) / 10)
df['speed_squared'] = avg_speed ** 2
df['speed_log'] = float(np.log1p(avg_speed))
# ✅ FIX: Add congestion-based features
congestion = df['congestion_level'].iloc[0]
df['high_congestion'] = int(congestion > 3)
df['low_congestion'] = int(congestion < 2)
df['congestion_squared'] = congestion ** 2
# ✅ FIX: Add ALL interaction features that training creates
# Weekend interactions
is_weekend = df['is_weekend'].iloc[0]
temperature = df['temperature'].iloc[0]
df['weekend_temp_interaction'] = is_weekend * temperature
df['weekend_pleasant_weather'] = is_weekend * df['is_pleasant_day'].iloc[0]
df['weekend_traffic_interaction'] = is_weekend * df['traffic_volume'].iloc[0]
# Holiday interactions
is_holiday = df['is_holiday'].iloc[0]
df['holiday_temp_interaction'] = is_holiday * temperature
df['holiday_traffic_interaction'] = is_holiday * df['traffic_volume'].iloc[0]
# Season interactions
season = df['season'].iloc[0]
df['season_temp_interaction'] = season * temperature
df['season_traffic_interaction'] = season * df['traffic_volume'].iloc[0]
# Rain-traffic interactions
is_rainy = df['is_rainy_day'].iloc[0]
df['rain_traffic_interaction'] = is_rainy * df['traffic_volume'].iloc[0]
df['rain_speed_interaction'] = is_rainy * df['average_speed'].iloc[0]
# Day-weather interactions
df['day_temp_interaction'] = day_of_week * temperature
df['month_temp_interaction'] = forecast_date.month * temperature
# Traffic-speed interactions
df['traffic_speed_interaction'] = df['traffic_volume'].iloc[0] * df['average_speed'].iloc[0]
df['pedestrian_speed_interaction'] = df['pedestrian_count'].iloc[0] * df['average_speed'].iloc[0]
# Congestion-related interactions
df['congestion_temp_interaction'] = congestion * temperature
df['congestion_weekend_interaction'] = congestion * is_weekend
# Add after the existing day-of-week features:
df['is_peak_bakery_day'] = int(day_of_week in [4, 5, 6]) # Friday, Saturday, Sunday
# Add after the month features:
df['is_high_demand_month'] = int(forecast_date.month in [6, 7, 8, 12]) # Summer and December
df['is_warm_season'] = int(forecast_date.month in [4, 5, 6, 7, 8, 9]) # Spring/summer months
logger.debug("Complete Prophet features prepared",
feature_count=len(df.columns),
date=features['date'],
season=df['season'].iloc[0],
traffic_volume=df['traffic_volume'].iloc[0],
average_speed=df['average_speed'].iloc[0],
pedestrian_count=df['pedestrian_count'].iloc[0])
return df
except Exception as e:
logger.error("Error preparing Prophet features", error=str(e))
raise
def _get_season(self, month: int) -> int:
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn) - MATCH TRAINING"""
if month in [12, 1, 2]:
return 1 # Winter
elif month in [3, 4, 5]:
return 2 # Spring
elif month in [6, 7, 8]:
return 3 # Summer
else:
return 4 # Autumn
def _is_school_holiday(self, date: datetime) -> bool:
"""Check if a date is during school holidays - MATCH TRAINING"""
month = date.month
# Approximate Spanish school holiday periods
if month in [7, 8]: # Summer holidays
return True
if month == 12 and date.day >= 20: # Christmas holidays
return True
if month == 1 and date.day <= 10: # Christmas holidays continued
return True
if month == 4 and date.day <= 15: # Easter holidays (approximate)
return True
return False
def _get_temp_category(self, temperature: float) -> int:
"""Get temperature category (0-3) - MATCH TRAINING"""
if temperature <= 5:
return 0 # Very cold
elif temperature <= 15:
return 1 # Cold
elif temperature <= 25:
return 2 # Mild
else:
return 3 # Hot
def _get_rain_intensity(self, precipitation: float) -> int:
"""Get rain intensity category (0-3) - MATCH TRAINING"""
if precipitation <= 0:
return 0 # No rain
elif precipitation <= 2:
return 1 # Light rain
elif precipitation <= 10:
return 2 # Moderate rain
else:
return 3 # Heavy rain