# shared/clients/forecast_client.py """ Forecast Service Client Handles all API calls to the forecasting service """ from typing import Dict, Any, Optional, List from datetime import date from .base_service_client import BaseServiceClient from shared.config.base import BaseServiceSettings class ForecastServiceClient(BaseServiceClient): """Client for communicating with the forecasting service""" def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"): super().__init__(calling_service_name, config) def get_service_base_path(self) -> str: return "/api/v1" # ================================================================ # FORECASTS # ================================================================ async def create_forecast( self, tenant_id: str, model_id: str, start_date: str, end_date: str, product_ids: Optional[List[str]] = None, include_confidence_intervals: bool = True, **kwargs ) -> Optional[Dict[str, Any]]: """Create a new forecast""" data = { "model_id": model_id, "start_date": start_date, "end_date": end_date, "include_confidence_intervals": include_confidence_intervals, **kwargs } if product_ids: data["product_ids"] = product_ids return await self.post("forecasts", data=data, tenant_id=tenant_id) async def get_forecast(self, tenant_id: str, forecast_id: str) -> Optional[Dict[str, Any]]: """Get forecast details""" return await self.get(f"forecasts/{forecast_id}", tenant_id=tenant_id) async def list_forecasts( self, tenant_id: str, status: Optional[str] = None, model_id: Optional[str] = None, limit: int = 50 ) -> Optional[List[Dict[str, Any]]]: """List forecasts for a tenant""" params = {"limit": limit} if status: params["status"] = status if model_id: params["model_id"] = model_id result = await self.get("forecasts", tenant_id=tenant_id, params=params) return result.get("forecasts", []) if result else None async def delete_forecast(self, tenant_id: str, forecast_id: str) -> Optional[Dict[str, Any]]: """Delete a forecast""" return await self.delete(f"forecasts/{forecast_id}", tenant_id=tenant_id) # ================================================================ # PREDICTIONS # ================================================================ async def get_predictions( self, tenant_id: str, forecast_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None, product_id: Optional[str] = None ) -> Optional[List[Dict[str, Any]]]: """Get predictions from a forecast""" params = {} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date if product_id: params["product_id"] = product_id result = await self.get(f"forecasts/{forecast_id}/predictions", tenant_id=tenant_id, params=params) return result.get("predictions", []) if result else None async def create_realtime_prediction( self, tenant_id: str, model_id: str, target_date: str, features: Dict[str, Any], inventory_product_id: Optional[str] = None, **kwargs ) -> Optional[Dict[str, Any]]: """Create a real-time prediction""" data = { "model_id": model_id, "target_date": target_date, "features": features, **kwargs } # Add inventory_product_id if provided (required by forecasting service) if inventory_product_id: data["inventory_product_id"] = inventory_product_id return await self.post("forecasts/single", data=data, tenant_id=tenant_id) async def create_single_forecast( self, tenant_id: str, inventory_product_id: str, forecast_date: date, location: str = "default", forecast_days: int = 1, confidence_level: float = 0.8, **kwargs ) -> Optional[Dict[str, Any]]: """Create a single product forecast using new API format""" from datetime import date as date_type # Convert date to string if needed if isinstance(forecast_date, date_type): forecast_date_str = forecast_date.isoformat() else: forecast_date_str = str(forecast_date) data = { "inventory_product_id": inventory_product_id, "forecast_date": forecast_date_str, "forecast_days": forecast_days, "location": location, "confidence_level": confidence_level, **kwargs } return await self.post("forecasts/single", data=data, tenant_id=tenant_id) # ================================================================ # FORECAST VALIDATION & METRICS # ================================================================ async def get_forecast_accuracy( self, tenant_id: str, forecast_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Optional[Dict[str, Any]]: """Get forecast accuracy metrics""" params = {} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date return await self.get(f"forecasts/{forecast_id}/accuracy", tenant_id=tenant_id, params=params) async def compare_forecasts( self, tenant_id: str, forecast_ids: List[str], metric: str = "mape" ) -> Optional[Dict[str, Any]]: """Compare multiple forecasts""" data = { "forecast_ids": forecast_ids, "metric": metric } return await self.post("forecasts/compare", data=data, tenant_id=tenant_id) # ================================================================ # FORECAST SCENARIOS # ================================================================ async def create_scenario_forecast( self, tenant_id: str, model_id: str, scenario_name: str, scenario_data: Dict[str, Any], start_date: str, end_date: str, **kwargs ) -> Optional[Dict[str, Any]]: """Create a scenario-based forecast""" data = { "model_id": model_id, "scenario_name": scenario_name, "scenario_data": scenario_data, "start_date": start_date, "end_date": end_date, **kwargs } return await self.post("scenarios", data=data, tenant_id=tenant_id) async def list_scenarios(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: """List forecast scenarios for a tenant""" result = await self.get("scenarios", tenant_id=tenant_id) return result.get("scenarios", []) if result else None