Files
bakery-ia/services/production/app/api/orchestrator.py

242 lines
8.9 KiB
Python
Raw Normal View History

2025-10-30 21:08:07 +01:00
# ================================================================
# services/production/app/api/orchestrator.py
# ================================================================
"""
Production Orchestrator API - Endpoints for orchestrated production scheduling
Called by the Orchestrator Service to generate production schedules from forecast data
"""
2025-12-13 23:57:54 +01:00
from fastapi import APIRouter, Depends, HTTPException, Path, Request
2025-10-30 21:08:07 +01:00
from typing import Optional, Dict, Any, List
from datetime import date
from uuid import UUID
from pydantic import BaseModel, Field
import structlog
from shared.routing import RouteBuilder
from app.services.production_service import ProductionService
from app.schemas.production import ProductionScheduleResponse
from app.core.config import settings
logger = structlog.get_logger()
route_builder = RouteBuilder('production')
router = APIRouter(tags=["production-orchestrator"])
2025-12-13 23:57:54 +01:00
def get_production_service(request: Request) -> ProductionService:
2025-10-30 21:08:07 +01:00
"""Dependency injection for production service"""
from app.core.database import database_manager
2025-12-13 23:57:54 +01:00
notification_service = getattr(request.app.state, 'notification_service', None)
return ProductionService(database_manager, settings, notification_service)
2025-10-30 21:08:07 +01:00
# ================================================================
# REQUEST/RESPONSE SCHEMAS
# ================================================================
class GenerateScheduleRequest(BaseModel):
"""
Request to generate production schedule (called by Orchestrator)
The Orchestrator calls Forecasting Service first, then passes forecast data here.
Production Service uses this data to determine what to produce.
NEW: Accepts cached data snapshots from Orchestrator to eliminate duplicate API calls.
"""
forecast_data: Dict[str, Any] = Field(..., description="Forecast data from Forecasting Service")
target_date: Optional[date] = Field(None, description="Target production date")
planning_horizon_days: int = Field(default=1, ge=1, le=7, description="Planning horizon in days")
# NEW: Cached data from Orchestrator
inventory_data: Optional[Dict[str, Any]] = Field(None, description="Cached inventory snapshot from Orchestrator")
recipes_data: Optional[Dict[str, Any]] = Field(None, description="Cached recipes snapshot from Orchestrator")
class Config:
json_schema_extra = {
"example": {
"forecast_data": {
"forecasts": [
{
"product_id": "uuid-here",
"predicted_demand": 100.0,
"confidence_score": 0.85
}
],
"forecast_id": "uuid-here",
"generated_at": "2025-01-30T10:00:00Z"
},
"target_date": "2025-01-31",
"planning_horizon_days": 1
}
}
class GenerateScheduleResponse(BaseModel):
"""Response from generate_schedule endpoint"""
success: bool
message: str
schedule_id: Optional[UUID] = None
schedule_number: Optional[str] = None
batches_created: int = 0
total_planned_quantity: float = 0.0
warnings: List[str] = []
errors: List[str] = []
class Config:
json_schema_extra = {
"example": {
"success": True,
"message": "Production schedule generated successfully",
"schedule_id": "uuid-here",
"schedule_number": "PROD-2025-01-30-001",
"batches_created": 5,
"total_planned_quantity": 500.0,
"warnings": [],
"errors": []
}
}
# ================================================================
# ORCHESTRATOR ENTRY POINT
# ================================================================
@router.post(
2025-11-05 13:34:56 +01:00
route_builder.build_operations_route("generate-schedule"),
2025-10-30 21:08:07 +01:00
response_model=GenerateScheduleResponse
)
async def generate_production_schedule(
tenant_id: UUID = Path(...),
request_data: GenerateScheduleRequest = ...,
production_service: ProductionService = Depends(get_production_service)
):
"""
Generate production schedule from forecast data (called by Orchestrator)
This is the main entry point for orchestrated production planning.
The Orchestrator calls Forecasting Service first, then passes forecast data here.
Flow:
1. Receive forecast data from orchestrator
2. Parse forecast to extract product demands
3. Check inventory levels for each product
4. Calculate production quantities needed
5. Create production schedule and batches
6. Return schedule summary
Args:
tenant_id: Tenant UUID
request_data: Schedule generation request with forecast data
Returns:
GenerateScheduleResponse with schedule details and created batches
"""
try:
logger.info("Generate production schedule endpoint called",
tenant_id=str(tenant_id),
has_forecast_data=bool(request_data.forecast_data))
target_date = request_data.target_date or date.today()
forecast_data = request_data.forecast_data
# Parse forecast data from orchestrator
forecasts = _parse_forecast_data(forecast_data)
if not forecasts:
return GenerateScheduleResponse(
success=False,
message="No forecast data provided",
errors=["Forecast data is empty or invalid"]
)
# Generate production schedule using the service (with cached data if available)
result = await production_service.generate_production_schedule_from_forecast(
tenant_id=tenant_id,
target_date=target_date,
forecasts=forecasts,
planning_horizon_days=request_data.planning_horizon_days,
inventory_data=request_data.inventory_data, # NEW: Pass cached inventory
recipes_data=request_data.recipes_data # NEW: Pass cached recipes
)
logger.info("Production schedule generated successfully",
tenant_id=str(tenant_id),
schedule_id=str(result.get('schedule_id')) if result.get('schedule_id') else None,
batches_created=result.get('batches_created', 0))
return GenerateScheduleResponse(
success=True,
message="Production schedule generated successfully",
schedule_id=result.get('schedule_id'),
schedule_number=result.get('schedule_number'),
batches_created=result.get('batches_created', 0),
total_planned_quantity=result.get('total_planned_quantity', 0.0),
warnings=result.get('warnings', []),
errors=[]
)
except Exception as e:
logger.error("Error generating production schedule",
error=str(e), tenant_id=str(tenant_id))
return GenerateScheduleResponse(
success=False,
message="Failed to generate production schedule",
errors=[str(e)]
)
# ================================================================
# HELPER FUNCTIONS
# ================================================================
def _parse_forecast_data(forecast_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Parse forecast data received from orchestrator
Expected format from Forecasting Service via Orchestrator:
{
"forecasts": [
{
"product_id": "uuid",
"inventory_product_id": "uuid", # Alternative field name
"predicted_demand": 100.0,
"predicted_value": 100.0, # Alternative field name
"confidence_score": 0.85,
...
}
],
"forecast_id": "uuid",
"generated_at": "2025-01-30T10:00:00Z"
}
"""
forecasts = []
forecast_list = forecast_data.get('forecasts', [])
for forecast_item in forecast_list:
# Extract product ID (try multiple field names)
product_id = (
forecast_item.get('product_id') or
forecast_item.get('inventory_product_id') or
forecast_item.get('item_id')
)
# Extract predicted demand (try multiple field names)
predicted_demand = (
forecast_item.get('predicted_demand') or
forecast_item.get('predicted_value') or
forecast_item.get('demand') or
0
)
if product_id and predicted_demand > 0:
forecasts.append({
'product_id': product_id,
'predicted_demand': float(predicted_demand),
'confidence_score': forecast_item.get('confidence_score', 0.8),
'lower_bound': forecast_item.get('lower_bound', 0),
'upper_bound': forecast_item.get('upper_bound', 0),
'forecast_id': forecast_data.get('forecast_id'),
})
return forecasts