# ================================================================ # services/production/app/api/orchestrator.py # ================================================================ """ Production Orchestrator API - Endpoints for orchestrated production scheduling Called by the Orchestrator Service to generate production schedules from forecast data """ from fastapi import APIRouter, Depends, HTTPException, Path from typing import Optional, Dict, Any, List from datetime import date from uuid import UUID from pydantic import BaseModel, Field import structlog from shared.routing import RouteBuilder from app.services.production_service import ProductionService from app.schemas.production import ProductionScheduleResponse from app.core.config import settings logger = structlog.get_logger() route_builder = RouteBuilder('production') router = APIRouter(tags=["production-orchestrator"]) def get_production_service() -> ProductionService: """Dependency injection for production service""" from app.core.database import database_manager return ProductionService(database_manager, settings) # ================================================================ # REQUEST/RESPONSE SCHEMAS # ================================================================ class GenerateScheduleRequest(BaseModel): """ Request to generate production schedule (called by Orchestrator) The Orchestrator calls Forecasting Service first, then passes forecast data here. Production Service uses this data to determine what to produce. NEW: Accepts cached data snapshots from Orchestrator to eliminate duplicate API calls. """ forecast_data: Dict[str, Any] = Field(..., description="Forecast data from Forecasting Service") target_date: Optional[date] = Field(None, description="Target production date") planning_horizon_days: int = Field(default=1, ge=1, le=7, description="Planning horizon in days") # NEW: Cached data from Orchestrator inventory_data: Optional[Dict[str, Any]] = Field(None, description="Cached inventory snapshot from Orchestrator") recipes_data: Optional[Dict[str, Any]] = Field(None, description="Cached recipes snapshot from Orchestrator") class Config: json_schema_extra = { "example": { "forecast_data": { "forecasts": [ { "product_id": "uuid-here", "predicted_demand": 100.0, "confidence_score": 0.85 } ], "forecast_id": "uuid-here", "generated_at": "2025-01-30T10:00:00Z" }, "target_date": "2025-01-31", "planning_horizon_days": 1 } } class GenerateScheduleResponse(BaseModel): """Response from generate_schedule endpoint""" success: bool message: str schedule_id: Optional[UUID] = None schedule_number: Optional[str] = None batches_created: int = 0 total_planned_quantity: float = 0.0 warnings: List[str] = [] errors: List[str] = [] class Config: json_schema_extra = { "example": { "success": True, "message": "Production schedule generated successfully", "schedule_id": "uuid-here", "schedule_number": "PROD-2025-01-30-001", "batches_created": 5, "total_planned_quantity": 500.0, "warnings": [], "errors": [] } } # ================================================================ # ORCHESTRATOR ENTRY POINT # ================================================================ @router.post( route_builder.build_operations_route("generate-schedule"), response_model=GenerateScheduleResponse ) async def generate_production_schedule( tenant_id: UUID = Path(...), request_data: GenerateScheduleRequest = ..., production_service: ProductionService = Depends(get_production_service) ): """ Generate production schedule from forecast data (called by Orchestrator) This is the main entry point for orchestrated production planning. The Orchestrator calls Forecasting Service first, then passes forecast data here. Flow: 1. Receive forecast data from orchestrator 2. Parse forecast to extract product demands 3. Check inventory levels for each product 4. Calculate production quantities needed 5. Create production schedule and batches 6. Return schedule summary Args: tenant_id: Tenant UUID request_data: Schedule generation request with forecast data Returns: GenerateScheduleResponse with schedule details and created batches """ try: logger.info("Generate production schedule endpoint called", tenant_id=str(tenant_id), has_forecast_data=bool(request_data.forecast_data)) target_date = request_data.target_date or date.today() forecast_data = request_data.forecast_data # Parse forecast data from orchestrator forecasts = _parse_forecast_data(forecast_data) if not forecasts: return GenerateScheduleResponse( success=False, message="No forecast data provided", errors=["Forecast data is empty or invalid"] ) # Generate production schedule using the service (with cached data if available) result = await production_service.generate_production_schedule_from_forecast( tenant_id=tenant_id, target_date=target_date, forecasts=forecasts, planning_horizon_days=request_data.planning_horizon_days, inventory_data=request_data.inventory_data, # NEW: Pass cached inventory recipes_data=request_data.recipes_data # NEW: Pass cached recipes ) logger.info("Production schedule generated successfully", tenant_id=str(tenant_id), schedule_id=str(result.get('schedule_id')) if result.get('schedule_id') else None, batches_created=result.get('batches_created', 0)) return GenerateScheduleResponse( success=True, message="Production schedule generated successfully", schedule_id=result.get('schedule_id'), schedule_number=result.get('schedule_number'), batches_created=result.get('batches_created', 0), total_planned_quantity=result.get('total_planned_quantity', 0.0), warnings=result.get('warnings', []), errors=[] ) except Exception as e: logger.error("Error generating production schedule", error=str(e), tenant_id=str(tenant_id)) return GenerateScheduleResponse( success=False, message="Failed to generate production schedule", errors=[str(e)] ) # ================================================================ # HELPER FUNCTIONS # ================================================================ def _parse_forecast_data(forecast_data: Dict[str, Any]) -> List[Dict[str, Any]]: """ Parse forecast data received from orchestrator Expected format from Forecasting Service via Orchestrator: { "forecasts": [ { "product_id": "uuid", "inventory_product_id": "uuid", # Alternative field name "predicted_demand": 100.0, "predicted_value": 100.0, # Alternative field name "confidence_score": 0.85, ... } ], "forecast_id": "uuid", "generated_at": "2025-01-30T10:00:00Z" } """ forecasts = [] forecast_list = forecast_data.get('forecasts', []) for forecast_item in forecast_list: # Extract product ID (try multiple field names) product_id = ( forecast_item.get('product_id') or forecast_item.get('inventory_product_id') or forecast_item.get('item_id') ) # Extract predicted demand (try multiple field names) predicted_demand = ( forecast_item.get('predicted_demand') or forecast_item.get('predicted_value') or forecast_item.get('demand') or 0 ) if product_id and predicted_demand > 0: forecasts.append({ 'product_id': product_id, 'predicted_demand': float(predicted_demand), 'confidence_score': forecast_item.get('confidence_score', 0.8), 'lower_bound': forecast_item.get('lower_bound', 0), 'upper_bound': forecast_item.get('upper_bound', 0), 'forecast_id': forecast_data.get('forecast_id'), }) return forecasts