""" ML Insights API Endpoints for Inventory Service Provides endpoints to trigger ML insight generation for: - Safety stock optimization - Inventory level recommendations - Demand pattern analysis """ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from typing import Optional, List from uuid import UUID from datetime import datetime, timedelta import structlog import pandas as pd from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession logger = structlog.get_logger() router = APIRouter( prefix="/api/v1/tenants/{tenant_id}/inventory/ml/insights", tags=["ML Insights"] ) # ================================================================ # REQUEST/RESPONSE SCHEMAS # ================================================================ class SafetyStockOptimizationRequest(BaseModel): """Request schema for safety stock optimization""" product_ids: Optional[List[str]] = Field( None, description="Specific product IDs to optimize. If None, optimizes all products" ) lookback_days: int = Field( 90, description="Days of historical demand to analyze", ge=30, le=365 ) min_history_days: int = Field( 30, description="Minimum days of history required", ge=7, le=180 ) class SafetyStockOptimizationResponse(BaseModel): """Response schema for safety stock optimization""" success: bool message: str tenant_id: str products_optimized: int total_insights_generated: int total_insights_posted: int total_cost_savings: float insights_by_product: dict errors: List[str] = [] # ================================================================ # API ENDPOINTS # ================================================================ @router.post("/optimize-safety-stock", response_model=SafetyStockOptimizationResponse) async def trigger_safety_stock_optimization( tenant_id: str, request_data: SafetyStockOptimizationRequest, db: AsyncSession = Depends(get_db) ): """ Trigger safety stock optimization for inventory products. This endpoint: 1. Fetches historical demand data for specified products 2. Runs the SafetyStockInsightsOrchestrator to optimize levels 3. Generates insights about safety stock recommendations 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Optimization parameters db: Database session Returns: SafetyStockOptimizationResponse with optimization results """ logger.info( "ML insights safety stock optimization requested", tenant_id=tenant_id, product_ids=request_data.product_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator from app.ml.safety_stock_insights_orchestrator import SafetyStockInsightsOrchestrator from app.models.inventory import Ingredient from sqlalchemy import select # Initialize orchestrator orchestrator = SafetyStockInsightsOrchestrator() # Get products to optimize if request_data.product_ids: query = select(Ingredient).where( Ingredient.tenant_id == UUID(tenant_id), Ingredient.id.in_([UUID(pid) for pid in request_data.product_ids]) ) else: query = select(Ingredient).where( Ingredient.tenant_id == UUID(tenant_id) ).limit(10) # Limit to prevent timeout result = await db.execute(query) products = result.scalars().all() if not products: return SafetyStockOptimizationResponse( success=False, message="No products found for optimization", tenant_id=tenant_id, products_optimized=0, total_insights_generated=0, total_insights_posted=0, total_cost_savings=0.0, insights_by_product={}, errors=["No products found"] ) # Calculate date range for demand history end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each product total_insights_generated = 0 total_insights_posted = 0 total_cost_savings = 0.0 insights_by_product = {} errors = [] for product in products: try: product_id = str(product.id) logger.info(f"Optimizing safety stock for {product.name} ({product_id})") # Fetch real sales/demand history from sales service from shared.clients.sales_client import SalesServiceClient from app.core.config import settings sales_client = SalesServiceClient(settings) try: # Fetch sales data for this product sales_response = await sales_client.get_sales_by_product( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) if not sales_response or not sales_response.get('sales'): logger.warning( f"No sales history for product {product_id}, skipping" ) continue # Convert sales data to daily demand sales_data = sales_response.get('sales', []) demand_data = [] for sale in sales_data: demand_data.append({ 'date': pd.to_datetime(sale.get('date') or sale.get('sale_date')), 'quantity': float(sale.get('quantity', 0)) }) if not demand_data: logger.warning( f"No valid demand data for product {product_id}, skipping" ) continue demand_history = pd.DataFrame(demand_data) # Aggregate by date if there are multiple sales per day demand_history = demand_history.groupby('date').agg({ 'quantity': 'sum' }).reset_index() if len(demand_history) < request_data.min_history_days: logger.warning( f"Insufficient demand history for product {product_id}: " f"{len(demand_history)} days < {request_data.min_history_days} required" ) continue except Exception as e: logger.error( f"Error fetching sales data for product {product_id}: {e}", exc_info=True ) continue # Product characteristics product_characteristics = { 'lead_time_days': 7, # TODO: Get from supplier data 'shelf_life_days': 30 if product.is_perishable else 365, 'perishable': product.is_perishable } # Run optimization results = await orchestrator.optimize_and_post_insights( tenant_id=tenant_id, inventory_product_id=product_id, demand_history=demand_history, product_characteristics=product_characteristics, min_history_days=request_data.min_history_days ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] if results.get('cost_savings'): total_cost_savings += results['cost_savings'] insights_by_product[product_id] = { 'product_name': product.name, 'insights_posted': results['insights_posted'], 'optimal_safety_stock': results.get('optimal_safety_stock'), 'cost_savings': results.get('cost_savings', 0.0) } logger.info( f"Product {product_id} optimization complete", insights_posted=results['insights_posted'], cost_savings=results.get('cost_savings', 0) ) except Exception as e: error_msg = f"Error optimizing product {product_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = SafetyStockOptimizationResponse( success=total_insights_posted > 0, message=f"Successfully optimized {len(products)} products, generated {total_insights_posted} insights", tenant_id=tenant_id, products_optimized=len(products), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, total_cost_savings=round(total_cost_savings, 2), insights_by_product=insights_by_product, errors=errors ) logger.info( "ML insights safety stock optimization complete", tenant_id=tenant_id, total_insights=total_insights_posted, total_savings=total_cost_savings ) return response except Exception as e: logger.error( "ML insights safety stock optimization failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Safety stock optimization failed: {str(e)}" ) @router.get("/health") async def ml_insights_health(): """Health check for ML insights endpoints""" return { "status": "healthy", "service": "inventory-ml-insights", "endpoints": [ "POST /ml/insights/optimize-safety-stock" ] }