From f3e8a6dda8c13eea616e324e456be650e49a949e Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Tue, 29 Jul 2025 21:08:33 +0200 Subject: [PATCH] Start fixing forecast service API 14 --- services/data/app/api/weather.py | 2 +- .../app/services/prediction_service.py | 99 +++++++++++++++++-- services/forecasting/requirements.txt | 1 + services/training/app/models/training.py | 1 + shared/clients/data_client.py | 4 +- 5 files changed, 95 insertions(+), 12 deletions(-) diff --git a/services/data/app/api/weather.py b/services/data/app/api/weather.py index 55465352..4fb9a210 100644 --- a/services/data/app/api/weather.py +++ b/services/data/app/api/weather.py @@ -72,7 +72,7 @@ async def get_current_weather( logger.error("Failed to get current weather", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") -@router.get("/tenants/{tenant_id}/weather/forecast", response_model=List[WeatherForecastResponse]) +@router.post("/tenants/{tenant_id}/weather/forecast", response_model=List[WeatherForecastResponse]) async def get_weather_forecast( latitude: float = Query(..., description="Latitude"), longitude: float = Query(..., description="Longitude"), diff --git a/services/forecasting/app/services/prediction_service.py b/services/forecasting/app/services/prediction_service.py index 021c03d2..03954f4a 100644 --- a/services/forecasting/app/services/prediction_service.py +++ b/services/forecasting/app/services/prediction_service.py @@ -17,6 +17,7 @@ import pandas as pd import httpx from pathlib import Path import os +import joblib from app.core.config import settings from shared.monitoring.metrics import MetricsCollector @@ -89,7 +90,11 @@ class PredictionService: raise async def _load_model(self, model_id: str, model_path: str): - """Load model from shared volume using API metadata""" + """Load model from shared volume using joblib""" + + if not await self._validate_model_file(model_path): + logger.error(f"Model file not valid: {model_path}") + return None # Check cache first if model_id in self.model_cache: @@ -98,11 +103,10 @@ class PredictionService: return cached_model try: - # Load model directly from shared volume (fast!) if os.path.exists(model_path): - with open(model_path, 'rb') as f: - model = pickle.load(f) - + # ✅ FIX: Use joblib.load instead of pickle.load + model = joblib.load(model_path) + # Cache the model self.model_cache[model_id] = (model, datetime.now()) logger.info(f"Model loaded from shared volume: {model_path}") @@ -114,6 +118,34 @@ class PredictionService: except Exception as e: logger.error(f"Error loading model: {e}") return None + + async def _validate_model_file(self, model_path: str) -> bool: + """Validate model file before loading""" + try: + if not os.path.exists(model_path): + logger.error(f"Model file not found: {model_path}") + return False + + # Check file size (should be > 1KB for a trained model) + file_size = os.path.getsize(model_path) + if file_size < 1024: + logger.warning(f"Model file too small ({file_size} bytes): {model_path}") + return False + + # Try to peek at file header to detect format + with open(model_path, 'rb') as f: + header = f.read(8) + + # Check for joblib signature + if header.startswith(b']\x93PICKLE') or header.startswith(b'\x80\x03'): + return True + else: + logger.warning(f"Unrecognized file format: {model_path}") + return False + + except Exception as e: + logger.error(f"Model validation error: {e}") + return False def _prepare_prophet_features(self, features: Dict[str, Any]) -> pd.DataFrame: """Convert features to Prophet-compatible DataFrame""" @@ -136,18 +168,67 @@ class PredictionService: else: df[feature] = 0.0 - # Add categorical features - df['day_of_week'] = int(features.get('day_of_week', 0)) - df['is_weekend'] = int(features.get('is_weekend', False)) + # Extract date information for temporal features + forecast_date = pd.to_datetime(features['date']) + day_of_week = forecast_date.weekday() # 0=Monday, 6=Sunday + + # Add temporal features (MUST match training service exactly!) + df['day_of_week'] = int(day_of_week) + df['day_of_month'] = int(forecast_date.day) + df['month'] = int(forecast_date.month) + df['quarter'] = int(forecast_date.quarter) + df['week_of_year'] = int(forecast_date.isocalendar().week) + + # Bakery-specific temporal features (match training exactly!) + df['is_weekend'] = int(day_of_week >= 5) # Saturday=5, Sunday=6 + df['is_monday'] = int(day_of_week == 0) # ✅ FIX: Add missing is_monday + df['is_tuesday'] = int(day_of_week == 1) + df['is_wednesday'] = int(day_of_week == 2) + df['is_thursday'] = int(day_of_week == 3) + df['is_friday'] = int(day_of_week == 4) + df['is_saturday'] = int(day_of_week == 5) + df['is_sunday'] = int(day_of_week == 6) + + # Month-based features + df['is_january'] = int(forecast_date.month == 1) + df['is_february'] = int(forecast_date.month == 2) + df['is_march'] = int(forecast_date.month == 3) + df['is_april'] = int(forecast_date.month == 4) + df['is_may'] = int(forecast_date.month == 5) + df['is_june'] = int(forecast_date.month == 6) + df['is_july'] = int(forecast_date.month == 7) + df['is_august'] = int(forecast_date.month == 8) + df['is_september'] = int(forecast_date.month == 9) + df['is_october'] = int(forecast_date.month == 10) + df['is_november'] = int(forecast_date.month == 11) + df['is_december'] = int(forecast_date.month == 12) + + # Season-based features + season = ((forecast_date.month % 12) + 3) // 3 # 1=spring, 2=summer, 3=autumn, 4=winter + df['is_spring'] = int(season == 1) + df['is_summer'] = int(season == 2) + df['is_autumn'] = int(season == 3) + df['is_winter'] = int(season == 4) + + # Business context features df['is_holiday'] = int(features.get('is_holiday', False)) # Business type encoding business_type = features.get('business_type', 'individual') df['is_central_workshop'] = int(business_type == 'central_workshop') + df['is_individual_bakery'] = int(business_type == 'individual') + + # Special day features (these might be in training data) + df['is_month_start'] = int(forecast_date.day <= 3) + df['is_month_end'] = int(forecast_date.day >= 28) + df['is_quarter_start'] = int(forecast_date.month in [1, 4, 7, 10] and forecast_date.day <= 7) + df['is_quarter_end'] = int(forecast_date.month in [3, 6, 9, 12] and forecast_date.day >= 25) logger.debug("Prepared Prophet features", features_count=len(df.columns), - date=features['date']) + date=features['date'], + day_of_week=day_of_week, + is_monday=df['is_monday'].iloc[0]) return df diff --git a/services/forecasting/requirements.txt b/services/forecasting/requirements.txt index 19367b1e..8fc24544 100644 --- a/services/forecasting/requirements.txt +++ b/services/forecasting/requirements.txt @@ -22,6 +22,7 @@ prophet==1.1.4 scikit-learn==1.3.2 pandas==2.1.4 numpy==1.25.2 +joblib==1.3.2 # Messaging aio-pika==9.3.1 diff --git a/services/training/app/models/training.py b/services/training/app/models/training.py index d594d89c..c761e600 100644 --- a/services/training/app/models/training.py +++ b/services/training/app/models/training.py @@ -169,6 +169,7 @@ class TrainedModel(Base): def to_dict(self): return { "id": self.id, + "model_id": self.id, "tenant_id": self.tenant_id, "product_name": self.product_name, "model_type": self.model_type, diff --git a/shared/clients/data_client.py b/shared/clients/data_client.py index a07cea60..730b8595 100644 --- a/shared/clients/data_client.py +++ b/shared/clients/data_client.py @@ -241,9 +241,9 @@ class DataServiceClient(BaseServiceClient): """ # Prepare request payload with proper date handling payload = { - "days": days, # Already in ISO format from calling code "latitude": latitude or 40.4168, # Default Madrid coordinates - "longitude": longitude or -3.7038 + "longitude": longitude or -3.7038, + "days": days # Already in ISO format from calling code } logger.info(f"Weather request payload: {payload}", tenant_id=tenant_id)