diff --git a/services/forecasting/app/main.py b/services/forecasting/app/main.py index 56fc1a76..de4f1ad5 100644 --- a/services/forecasting/app/main.py +++ b/services/forecasting/app/main.py @@ -46,6 +46,7 @@ async def lifespan(app: FastAPI): # Register custom metrics metrics_collector.register_counter("forecasts_generated_total", "Total forecasts generated") metrics_collector.register_counter("predictions_served_total", "Total predictions served") + metrics_collector.register_counter("prediction_errors_total", "Total prediction errors") metrics_collector.register_histogram("forecast_processing_time_seconds", "Time to process forecast request") metrics_collector.register_gauge("active_models_count", "Number of active models") diff --git a/services/forecasting/app/services/forecasting_service.py b/services/forecasting/app/services/forecasting_service.py index 3bfd6658..708526e7 100644 --- a/services/forecasting/app/services/forecasting_service.py +++ b/services/forecasting/app/services/forecasting_service.py @@ -89,16 +89,39 @@ class ForecastingService: prediction=adjusted_prediction['prediction']) return ForecastResponse( - id=forecast.id, - forecast_date=forecast.forecast_date, + id=str(forecast.id), + tenant_id=str(forecast.tenant_id), product_name=forecast.product_name, - predicted_quantity=forecast.predicted_quantity, + location=forecast.location, + forecast_date=forecast.forecast_date, + + # Predictions + predicted_demand=forecast.predicted_demand, + confidence_lower=forecast.confidence_lower, + confidence_upper=forecast.confidence_upper, confidence_level=forecast.confidence_level, - lower_bound=forecast.lower_bound, - upper_bound=forecast.upper_bound, - model_id=forecast.model_id, + + # Model info + model_id=str(forecast.model_id), + model_version=forecast.model_version, + algorithm=forecast.algorithm, + + # Context + business_type=forecast.business_type, + is_holiday=forecast.is_holiday, + is_weekend=forecast.is_weekend, + day_of_week=forecast.day_of_week, + + # External factors + weather_temperature=forecast.weather_temperature, + weather_precipitation=forecast.weather_precipitation, + weather_description=forecast.weather_description, + traffic_volume=forecast.traffic_volume, + + # Metadata created_at=forecast.created_at, - external_factors=forecast.external_factors + processing_time_ms=forecast.processing_time_ms, + features_used=forecast.features_used ) except Exception as e: @@ -171,7 +194,7 @@ class ForecastingService: await self._add_weather_features_with_fallbacks(features, tenant_id) # Add traffic data with fallbacks - # await self._add_traffic_features_with_fallbacks(features, tenant_id) + await self._add_traffic_features_with_fallbacks(features, tenant_id) return features @@ -248,23 +271,23 @@ class ForecastingService: ) -> None: """Add traffic features with fallbacks""" - try: - traffic_data = await self.data_client.get_traffic_data( - tenant_id=tenant_id, - latitude=40.4168, - longitude=-3.7038 - ) - - if traffic_data: - features.update({ - "traffic_volume": traffic_data.get("traffic_volume", 100), - "pedestrian_count": traffic_data.get("pedestrian_count", 50), - }) - logger.info("Traffic data acquired successfully", tenant_id=tenant_id) - return + # try: + # traffic_data = await self.data_client.get_traffic_data( + # tenant_id=tenant_id, + # latitude=40.4168, + # longitude=-3.7038 + # ) + # + # if traffic_data: + # features.update({ + # "traffic_volume": traffic_data.get("traffic_volume", 100), + # "pedestrian_count": traffic_data.get("pedestrian_count", 50), + # }) + # logger.info("Traffic data acquired successfully", tenant_id=tenant_id) + # return - except Exception as e: - logger.warning("Traffic data acquisition failed", error=str(e)) + # except Exception as e: + # logger.warning("Traffic data acquisition failed", error=str(e)) # Fallback: Use typical values based on day of week day_of_week = features["day_of_week"] @@ -273,6 +296,7 @@ class ForecastingService: features.update({ "traffic_volume": int(100 * weekend_factor), "pedestrian_count": int(50 * weekend_factor), + "congestion_level": 1 }) logger.warning("Using default traffic values", tenant_id=tenant_id) @@ -398,15 +422,36 @@ class ForecastingService: forecast = Forecast( tenant_id=tenant_id, - forecast_date=request.forecast_date, product_name=request.product_name, - predicted_quantity=prediction["prediction"], + location=request.location, + forecast_date=request.forecast_date, + + # Predictions + predicted_demand=prediction['prediction'], + confidence_lower=prediction['lower_bound'], + confidence_upper=prediction['upper_bound'], confidence_level=request.confidence_level, - lower_bound=prediction["lower_bound"], - upper_bound=prediction["upper_bound"], - model_id=model_data["model_id"], - external_factors=features, - created_at=datetime.utcnow() + + # Model info + model_id=model_data['model_id'], + model_version=model_data.get('version', '1.0'), + algorithm=model_data.get('algorithm', 'prophet'), + + # Context + business_type=features.get('business_type', 'individual'), + is_holiday=features.get('is_holiday', False), + is_weekend=features.get('is_weekend', False), + day_of_week=features.get('day_of_week', 0), + + # External factors + weather_temperature=features.get('temperature'), + weather_precipitation=features.get('precipitation'), + weather_description=features.get('weather_description'), + traffic_volume=features.get('traffic_volume'), + + # Metadata + processing_time_ms=int((datetime.now() - start_time).total_seconds() * 1000), + features_used=features ) db.add(forecast) diff --git a/services/forecasting/app/services/prediction_service.py b/services/forecasting/app/services/prediction_service.py index 57f0829b..e4cdfbb6 100644 --- a/services/forecasting/app/services/prediction_service.py +++ b/services/forecasting/app/services/prediction_service.py @@ -220,7 +220,7 @@ class PredictionService: # Add numeric features with safe conversion numeric_features = [ 'temperature', 'precipitation', 'humidity', 'wind_speed', - 'traffic_volume', 'pedestrian_count', 'pressure' # ✅ FIX: Added pressure + 'traffic_volume', 'pedestrian_count', 'pressure' ] for feature in numeric_features: @@ -313,15 +313,13 @@ class PredictionService: df['is_heavy_rain'] = int(precip > 10.0) df['rain_intensity'] = int(self._get_rain_intensity(precip)) - # Traffic-based features (if available) + # Traffic-based features if 'traffic_volume' in df.columns and df['traffic_volume'].iloc[0] > 0: traffic = df['traffic_volume'].iloc[0] - # Simple categorization since we don't have historical data for quantiles df['high_traffic'] = int(traffic > 150) # Assumption based on typical values df['low_traffic'] = int(traffic < 50) df['traffic_normalized'] = float((traffic - 100) / 50) # Simple normalization - - # ✅ FIX: Add additional traffic features that might be in training + df['congestion_level'] = int(min(5, max(1, traffic // 50))) df['traffic_squared'] = traffic ** 2 df['traffic_log'] = float(np.log1p(traffic)) # log(1+traffic) to handle zeros else: @@ -330,6 +328,7 @@ class PredictionService: df['traffic_normalized'] = 0.0 df['traffic_squared'] = 0.0 df['traffic_log'] = 0.0 + df['congestion_level'] = 1 # Interaction features (common in training) if 'is_weekend' in df.columns and 'temperature' in df.columns: diff --git a/services/training/app/ml/data_processor.py b/services/training/app/ml/data_processor.py index 89032452..47969de2 100644 --- a/services/training/app/ml/data_processor.py +++ b/services/training/app/ml/data_processor.py @@ -27,6 +27,21 @@ class BakeryDataProcessor: self.imputers = {} # Store imputers for missing value handling self.date_alignment_service = DateAlignmentService() + def _ensure_timezone_aware(self, df: pd.DataFrame, date_column: str = 'date') -> pd.DataFrame: + """Ensure date column is timezone-aware to prevent conversion errors""" + if date_column in df.columns: + # Convert to datetime if not already + df[date_column] = pd.to_datetime(df[date_column]) + + # If timezone-naive, localize to UTC + if df[date_column].dt.tz is None: + df[date_column] = df[date_column].dt.tz_localize('UTC') + # If already timezone-aware but not UTC, convert to UTC + elif str(df[date_column].dt.tz) != 'UTC': + df[date_column] = df[date_column].dt.tz_convert('UTC') + + return df + async def prepare_training_data(self, sales_data: pd.DataFrame, weather_data: pd.DataFrame, @@ -50,6 +65,11 @@ class BakeryDataProcessor: # Step 1: Convert and validate sales data sales_clean = await self._process_sales_data(sales_data, product_name) + # FIX: Ensure timezone awareness before any operations + sales_clean = self._ensure_timezone_aware(sales_clean) + weather_data = self._ensure_timezone_aware(weather_data) if not weather_data.empty else weather_data + traffic_data = self._ensure_timezone_aware(traffic_data) if not traffic_data.empty else traffic_data + # Step 2: Apply date alignment if we have date constraints sales_clean = await self._apply_date_alignment(sales_clean, weather_data, traffic_data)