""" ML Insights API Endpoints for Production Service Provides endpoints to trigger ML insight generation for: - Production yield predictions - Quality optimization - Process efficiency analysis """ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from typing import Optional, List from uuid import UUID from datetime import datetime, timedelta import structlog import pandas as pd from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession logger = structlog.get_logger() router = APIRouter( prefix="/api/v1/tenants/{tenant_id}/production/ml/insights", tags=["ML Insights"] ) # ================================================================ # REQUEST/RESPONSE SCHEMAS # ================================================================ class YieldPredictionRequest(BaseModel): """Request schema for yield prediction""" recipe_ids: Optional[List[str]] = Field( None, description="Specific recipe IDs to analyze. If None, analyzes all recipes" ) lookback_days: int = Field( 90, description="Days of historical production to analyze", ge=30, le=365 ) min_history_runs: int = Field( 30, description="Minimum production runs required", ge=10, le=100 ) class YieldPredictionResponse(BaseModel): """Response schema for yield prediction""" success: bool message: str tenant_id: str recipes_analyzed: int total_insights_generated: int total_insights_posted: int recipes_with_issues: int insights_by_recipe: dict errors: List[str] = [] # ================================================================ # API ENDPOINTS # ================================================================ @router.post("/predict-yields", response_model=YieldPredictionResponse) async def trigger_yield_prediction( tenant_id: str, request_data: YieldPredictionRequest, db: AsyncSession = Depends(get_db) ): """ Trigger yield prediction for production recipes. This endpoint: 1. Fetches historical production data for specified recipes 2. Runs the YieldInsightsOrchestrator to predict yields 3. Generates insights about yield optimization opportunities 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Prediction parameters db: Database session Returns: YieldPredictionResponse with prediction results """ logger.info( "ML insights yield prediction requested", tenant_id=tenant_id, recipe_ids=request_data.recipe_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.yield_insights_orchestrator import YieldInsightsOrchestrator from shared.clients.recipes_client import RecipesServiceClient from app.core.config import settings # Initialize orchestrator and recipes client orchestrator = YieldInsightsOrchestrator() recipes_client = RecipesServiceClient(settings) # Get recipes to analyze from recipes service via API if request_data.recipe_ids: # Fetch specific recipes recipes = [] for recipe_id in request_data.recipe_ids: recipe = await recipes_client.get_recipe_by_id( recipe_id=recipe_id, tenant_id=tenant_id ) if recipe: recipes.append(recipe) else: # Fetch all recipes for tenant (limit to 10) all_recipes = await recipes_client.get_all_recipes(tenant_id=tenant_id) recipes = all_recipes[:10] if all_recipes else [] # Limit to prevent timeout if not recipes: return YieldPredictionResponse( success=False, message="No recipes found for analysis", tenant_id=tenant_id, recipes_analyzed=0, total_insights_generated=0, total_insights_posted=0, recipes_with_issues=0, insights_by_recipe={}, errors=["No recipes found"] ) # Calculate date range for production history end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each recipe total_insights_generated = 0 total_insights_posted = 0 recipes_with_issues = 0 insights_by_recipe = {} errors = [] for recipe in recipes: try: recipe_id = str(recipe['id']) recipe_name = recipe.get('name', 'Unknown Recipe') logger.info(f"Analyzing yield for {recipe_name} ({recipe_id})") # Fetch real production batch history from database from app.models.production import ProductionBatch, ProductionStatus from sqlalchemy import select batch_query = select(ProductionBatch).where( ProductionBatch.tenant_id == UUID(tenant_id), ProductionBatch.recipe_id == UUID(recipe_id), # Use the extracted UUID ProductionBatch.actual_start_time >= start_date, ProductionBatch.actual_start_time <= end_date, ProductionBatch.status == ProductionStatus.COMPLETED, ProductionBatch.actual_quantity.isnot(None) ).order_by(ProductionBatch.actual_start_time) batch_result = await db.execute(batch_query) batches = batch_result.scalars().all() if len(batches) < request_data.min_history_runs: logger.warning( f"Insufficient production history for recipe {recipe_id}: " f"{len(batches)} batches < {request_data.min_history_runs} required" ) continue # Create production history DataFrame from real batches production_data = [] for batch in batches: # Calculate yield percentage if batch.planned_quantity and batch.actual_quantity: yield_pct = (batch.actual_quantity / batch.planned_quantity) * 100 else: continue # Skip batches without complete data production_data.append({ 'production_date': batch.actual_start_time, 'planned_quantity': float(batch.planned_quantity), 'actual_quantity': float(batch.actual_quantity), 'yield_percentage': yield_pct, 'worker_id': batch.notes or 'unknown', # Use notes field or default 'batch_number': batch.batch_number }) if not production_data: logger.warning( f"No valid production data for recipe {recipe_id}" ) continue production_history = pd.DataFrame(production_data) # Run yield analysis results = await orchestrator.analyze_and_post_insights( tenant_id=tenant_id, recipe_id=recipe_id, production_history=production_history, min_history_runs=request_data.min_history_runs ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] baseline_stats = results.get('baseline_stats', {}) mean_yield = baseline_stats.get('mean_yield', 100) if mean_yield < 90: recipes_with_issues += 1 insights_by_recipe[recipe_id] = { 'recipe_name': recipe_name, 'insights_posted': results['insights_posted'], 'mean_yield': mean_yield, 'patterns': len(results.get('patterns', [])) } logger.info( f"Recipe {recipe_id} analysis complete", insights_posted=results['insights_posted'], mean_yield=mean_yield ) except Exception as e: error_msg = f"Error analyzing recipe {recipe_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator and clients await orchestrator.close() await recipes_client.close() # Build response response = YieldPredictionResponse( success=total_insights_posted > 0, message=f"Successfully analyzed {len([r for r in recipes if isinstance(r, dict)])} recipes, generated {total_insights_posted} insights", tenant_id=tenant_id, recipes_analyzed=len([r for r in recipes if isinstance(r, dict)]), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, recipes_with_issues=recipes_with_issues, insights_by_recipe=insights_by_recipe, errors=errors ) logger.info( "ML insights yield prediction complete", tenant_id=tenant_id, total_insights=total_insights_posted, recipes_with_issues=recipes_with_issues ) return response except Exception as e: logger.error( "ML insights yield prediction failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Yield prediction failed: {str(e)}" ) @router.get("/health") async def ml_insights_health(): """Health check for ML insights endpoints""" return { "status": "healthy", "service": "production-ml-insights", "endpoints": [ "POST /ml/insights/predict-yields" ] }