""" ML Insights API Endpoints for Procurement Service Provides endpoints to trigger ML insight generation for: - Supplier performance analysis - Price forecasting and timing recommendations """ from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel, Field from typing import Optional, List from uuid import UUID from datetime import datetime, timedelta import structlog import pandas as pd from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession logger = structlog.get_logger() router = APIRouter( prefix="/api/v1/tenants/{tenant_id}/procurement/ml/insights", tags=["ML Insights"] ) # ================================================================ # REQUEST/RESPONSE SCHEMAS - SUPPLIER ANALYSIS # ================================================================ class SupplierAnalysisRequest(BaseModel): """Request schema for supplier performance analysis""" supplier_ids: Optional[List[str]] = Field( None, description="Specific supplier IDs to analyze. If None, analyzes all suppliers" ) lookback_days: int = Field( 180, description="Days of historical orders to analyze", ge=30, le=730 ) min_orders: int = Field( 10, description="Minimum orders required for analysis", ge=5, le=100 ) class SupplierAnalysisResponse(BaseModel): """Response schema for supplier performance analysis""" success: bool message: str tenant_id: str suppliers_analyzed: int total_insights_generated: int total_insights_posted: int high_risk_suppliers: int insights_by_supplier: dict errors: List[str] = [] # ================================================================ # REQUEST/RESPONSE SCHEMAS - PRICE FORECASTING # ================================================================ class PriceForecastRequest(BaseModel): """Request schema for price forecasting""" ingredient_ids: Optional[List[str]] = Field( None, description="Specific ingredient IDs to forecast. If None, forecasts all ingredients" ) lookback_days: int = Field( 180, description="Days of historical price data to analyze", ge=90, le=730 ) forecast_horizon_days: int = Field( 30, description="Days to forecast ahead", ge=7, le=90 ) class PriceForecastResponse(BaseModel): """Response schema for price forecasting""" success: bool message: str tenant_id: str ingredients_forecasted: int total_insights_generated: int total_insights_posted: int buy_now_recommendations: int bulk_opportunities: int insights_by_ingredient: dict errors: List[str] = [] # ================================================================ # API ENDPOINTS - SUPPLIER ANALYSIS # ================================================================ @router.post("/analyze-suppliers", response_model=SupplierAnalysisResponse) async def trigger_supplier_analysis( tenant_id: str, request_data: SupplierAnalysisRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Trigger supplier performance analysis. This endpoint: 1. Fetches historical purchase order data for specified suppliers 2. Runs the SupplierInsightsOrchestrator to analyze reliability 3. Generates insights about supplier performance and risk 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Analysis parameters db: Database session Returns: SupplierAnalysisResponse with analysis results """ logger.info( "ML insights supplier analysis requested", tenant_id=tenant_id, supplier_ids=request_data.supplier_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.supplier_insights_orchestrator import SupplierInsightsOrchestrator from app.models.purchase_order import PurchaseOrder from shared.clients.suppliers_client import SuppliersServiceClient from app.core.config import settings from sqlalchemy import select # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and clients orchestrator = SupplierInsightsOrchestrator(event_publisher=event_publisher) suppliers_client = SuppliersServiceClient(settings) # Get suppliers to analyze from suppliers service via API if request_data.supplier_ids: # Fetch specific suppliers suppliers = [] for supplier_id in request_data.supplier_ids: supplier = await suppliers_client.get_supplier_by_id( tenant_id=tenant_id, supplier_id=supplier_id ) if supplier: suppliers.append(supplier) else: # Fetch all active suppliers (limit to 10) all_suppliers = await suppliers_client.get_all_suppliers( tenant_id=tenant_id, is_active=True ) suppliers = (all_suppliers or [])[:10] # Limit to prevent timeout if not suppliers: return SupplierAnalysisResponse( success=False, message="No suppliers found for analysis", tenant_id=tenant_id, suppliers_analyzed=0, total_insights_generated=0, total_insights_posted=0, high_risk_suppliers=0, insights_by_supplier={}, errors=["No suppliers found"] ) # Calculate date range for order history end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each supplier total_insights_generated = 0 total_insights_posted = 0 high_risk_suppliers = 0 insights_by_supplier = {} errors = [] for supplier in suppliers: try: supplier_id = str(supplier['id']) supplier_name = supplier.get('name', 'Unknown') logger.info(f"Analyzing supplier {supplier_name} ({supplier_id})") # Get purchase orders for this supplier from local database po_query = select(PurchaseOrder).where( PurchaseOrder.tenant_id == UUID(tenant_id), PurchaseOrder.supplier_id == UUID(supplier_id), PurchaseOrder.order_date >= start_date, PurchaseOrder.order_date <= end_date ) po_result = await db.execute(po_query) purchase_orders = po_result.scalars().all() if len(purchase_orders) < request_data.min_orders: logger.warning( f"Insufficient orders for supplier {supplier_id}: " f"{len(purchase_orders)} < {request_data.min_orders} required" ) continue # Create order history DataFrame order_data = [] for po in purchase_orders: # Calculate delivery performance if po.delivery_date and po.expected_delivery_date: days_late = (po.delivery_date - po.expected_delivery_date).days on_time = days_late <= 0 else: days_late = 0 on_time = True # Calculate quality score (based on status) quality_score = 100 if po.status == 'completed' else 80 order_data.append({ 'order_date': po.order_date, 'expected_delivery_date': po.expected_delivery_date, 'delivery_date': po.delivery_date, 'days_late': days_late, 'on_time': on_time, 'quality_score': quality_score, 'total_amount': float(po.total_amount) if po.total_amount else 0 }) order_history = pd.DataFrame(order_data) # Run supplier analysis results = await orchestrator.analyze_and_post_supplier_insights( tenant_id=tenant_id, supplier_id=supplier_id, order_history=order_history, min_orders=request_data.min_orders ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] reliability_score = results.get('reliability_score', 100) if reliability_score < 70: high_risk_suppliers += 1 insights_by_supplier[supplier_id] = { 'supplier_name': supplier_name, 'insights_posted': results['insights_posted'], 'reliability_score': reliability_score, 'orders_analyzed': results['orders_analyzed'] } logger.info( f"Supplier {supplier_id} analysis complete", insights_posted=results['insights_posted'], reliability_score=reliability_score ) except Exception as e: error_msg = f"Error analyzing supplier {supplier_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = SupplierAnalysisResponse( success=total_insights_posted > 0, message=f"Successfully analyzed {len(insights_by_supplier)} suppliers, generated {total_insights_posted} insights", tenant_id=tenant_id, suppliers_analyzed=len(insights_by_supplier), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, high_risk_suppliers=high_risk_suppliers, insights_by_supplier=insights_by_supplier, errors=errors ) logger.info( "ML insights supplier analysis complete", tenant_id=tenant_id, total_insights=total_insights_posted, high_risk_suppliers=high_risk_suppliers ) return response except Exception as e: logger.error( "ML insights supplier analysis failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Supplier analysis failed: {str(e)}" ) # ================================================================ # API ENDPOINTS - PRICE FORECASTING # ================================================================ @router.post("/forecast-prices", response_model=PriceForecastResponse) async def trigger_price_forecasting( tenant_id: str, request_data: PriceForecastRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Trigger price forecasting for procurement ingredients. This endpoint: 1. Fetches historical price data for specified ingredients 2. Runs the PriceInsightsOrchestrator to forecast future prices 3. Generates insights about optimal purchase timing 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Forecasting parameters db: Database session Returns: PriceForecastResponse with forecasting results """ logger.info( "ML insights price forecasting requested", tenant_id=tenant_id, ingredient_ids=request_data.ingredient_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.price_insights_orchestrator import PriceInsightsOrchestrator from shared.clients.inventory_client import InventoryServiceClient from app.models.purchase_order import PurchaseOrderItem from app.core.config import settings from sqlalchemy import select # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and inventory client orchestrator = PriceInsightsOrchestrator(event_publisher=event_publisher) inventory_client = InventoryServiceClient(settings) # Get ingredients to forecast from inventory service via API if request_data.ingredient_ids: # Fetch specific ingredients ingredients = [] for ingredient_id in request_data.ingredient_ids: ingredient = await inventory_client.get_ingredient_by_id( ingredient_id=ingredient_id, tenant_id=tenant_id ) if ingredient: ingredients.append(ingredient) else: # Fetch all ingredients for tenant (limit to 10) all_ingredients = await inventory_client.get_all_ingredients(tenant_id=tenant_id) ingredients = all_ingredients[:10] if all_ingredients else [] # Limit to prevent timeout if not ingredients: return PriceForecastResponse( success=False, message="No ingredients found for forecasting", tenant_id=tenant_id, ingredients_forecasted=0, total_insights_generated=0, total_insights_posted=0, buy_now_recommendations=0, bulk_opportunities=0, insights_by_ingredient={}, errors=["No ingredients found"] ) # Calculate date range for price history end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each ingredient total_insights_generated = 0 total_insights_posted = 0 buy_now_recommendations = 0 bulk_opportunities = 0 insights_by_ingredient = {} errors = [] for ingredient in ingredients: try: ingredient_id = str(ingredient['id']) ingredient_name = ingredient.get('name', 'Unknown Ingredient') logger.info(f"Forecasting prices for {ingredient_name} ({ingredient_id})") # Get price history from purchase order items poi_query = select(PurchaseOrderItem).where( PurchaseOrderItem.inventory_product_id == UUID(ingredient_id) ).join( PurchaseOrderItem.purchase_order ).where( PurchaseOrderItem.purchase_order.has( tenant_id=UUID(tenant_id) ) ) poi_result = await db.execute(poi_query) purchase_items = poi_result.scalars().all() if len(purchase_items) < 30: logger.warning( f"Insufficient price history for ingredient {ingredient_id}: " f"{len(purchase_items)} items" ) continue # Create price history DataFrame price_data = [] for item in purchase_items: if item.unit_price and item.quantity: price_data.append({ 'date': item.purchase_order.order_date, 'price': float(item.unit_price), 'quantity': float(item.quantity), 'supplier_id': str(item.purchase_order.supplier_id) }) price_history = pd.DataFrame(price_data) price_history = price_history.sort_values('date') # Run price forecasting results = await orchestrator.forecast_and_post_insights( tenant_id=tenant_id, ingredient_id=ingredient_id, price_history=price_history, forecast_horizon_days=request_data.forecast_horizon_days, min_history_days=request_data.lookback_days ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] recommendation = results.get('recommendation', {}) if recommendation.get('action') == 'buy_now': buy_now_recommendations += 1 bulk_opp = results.get('bulk_opportunity', {}) if bulk_opp.get('has_bulk_opportunity'): bulk_opportunities += 1 insights_by_ingredient[ingredient_id] = { 'ingredient_name': ingredient_name, 'insights_posted': results['insights_posted'], 'recommendation': recommendation.get('action'), 'has_bulk_opportunity': bulk_opp.get('has_bulk_opportunity', False) } logger.info( f"Ingredient {ingredient_id} forecasting complete", insights_posted=results['insights_posted'], recommendation=recommendation.get('action') ) except Exception as e: error_msg = f"Error forecasting ingredient {ingredient_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = PriceForecastResponse( success=total_insights_posted > 0, message=f"Successfully forecasted {len(insights_by_ingredient)} ingredients, generated {total_insights_posted} insights", tenant_id=tenant_id, ingredients_forecasted=len(insights_by_ingredient), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, buy_now_recommendations=buy_now_recommendations, bulk_opportunities=bulk_opportunities, insights_by_ingredient=insights_by_ingredient, errors=errors ) logger.info( "ML insights price forecasting complete", tenant_id=tenant_id, total_insights=total_insights_posted, buy_now_recommendations=buy_now_recommendations, bulk_opportunities=bulk_opportunities ) return response except Exception as e: logger.error( "ML insights price forecasting failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Price forecasting failed: {str(e)}" ) @router.get("/health") async def ml_insights_health(): """Health check for ML insights endpoints""" return { "status": "healthy", "service": "procurement-ml-insights", "endpoints": [ "POST /ml/insights/analyze-suppliers", "POST /ml/insights/forecast-prices", "POST /internal/ml/generate-price-insights" ] } # ================================================================ # INTERNAL API ENDPOINT - Called by demo session service # ================================================================ from fastapi import Request # Create a separate router for internal endpoints to avoid the tenant prefix internal_router = APIRouter( tags=["ML Insights - Internal"] ) @internal_router.post("/api/v1/tenants/{tenant_id}/procurement/internal/ml/generate-price-insights") async def generate_price_insights_internal( tenant_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ Internal endpoint to trigger price insights generation for demo sessions. This endpoint is called by the demo-session service after cloning data. It uses the same ML logic as the public endpoint but with optimized defaults. Security: Protected by x-internal-service header check. Args: tenant_id: The tenant UUID request: FastAPI request object db: Database session Returns: { "insights_posted": int, "tenant_id": str, "status": str } """ # Verify internal service header if not request or request.headers.get("x-internal-service") not in ["demo-session", "internal"]: logger.warning("Unauthorized internal API call", tenant_id=tenant_id) raise HTTPException( status_code=403, detail="This endpoint is for internal service use only" ) logger.info("Internal price insights generation triggered", tenant_id=tenant_id) try: # Use the existing price forecasting logic with sensible defaults request_data = PriceForecastRequest( ingredient_ids=None, # Analyze all ingredients lookback_days=180, # 6 months of history forecast_horizon_days=30 # Forecast 30 days ahead ) # Call the existing price forecasting endpoint logic result = await trigger_price_forecasting( tenant_id=tenant_id, request_data=request_data, request=request, db=db ) # Return simplified response for internal use return { "insights_posted": result.total_insights_posted, "tenant_id": tenant_id, "status": "success" if result.success else "failed", "message": result.message, "ingredients_analyzed": result.ingredients_forecasted, "buy_now_recommendations": result.buy_now_recommendations } except Exception as e: logger.error( "Internal price insights generation failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Internal price insights generation failed: {str(e)}" )