""" ML Insights API Endpoints for Forecasting Service Provides endpoints to trigger ML insight generation for: - Dynamic business rules learning - Demand pattern analysis - Seasonal trend detection """ from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from pydantic import BaseModel, Field from typing import Optional, List from uuid import UUID from datetime import datetime, timedelta import structlog import pandas as pd from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession logger = structlog.get_logger() router = APIRouter( prefix="/api/v1/tenants/{tenant_id}/forecasting/ml/insights", tags=["ML Insights"] ) # ================================================================ # REQUEST/RESPONSE SCHEMAS # ================================================================ class RulesGenerationRequest(BaseModel): """Request schema for rules generation""" product_ids: Optional[List[str]] = Field( None, description="Specific product IDs to analyze. If None, analyzes all products" ) lookback_days: int = Field( 90, description="Days of historical data to analyze", ge=30, le=365 ) min_samples: int = Field( 10, description="Minimum samples required for rule learning", ge=5, le=100 ) class RulesGenerationResponse(BaseModel): """Response schema for rules generation""" success: bool message: str tenant_id: str products_analyzed: int total_insights_generated: int total_insights_posted: int insights_by_product: dict errors: List[str] = [] # ================================================================ # API ENDPOINTS # ================================================================ @router.post("/generate-rules", response_model=RulesGenerationResponse) async def trigger_rules_generation( tenant_id: str, request_data: RulesGenerationRequest, db: AsyncSession = Depends(get_db) ): """ Trigger dynamic business rules learning from historical sales data. This endpoint: 1. Fetches historical sales data for specified products 2. Runs the RulesOrchestrator to learn patterns 3. Generates insights about optimal business rules 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Rules generation parameters db: Database session Returns: RulesGenerationResponse with generation results """ logger.info( "ML insights rules generation requested", tenant_id=tenant_id, product_ids=request_data.product_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.rules_orchestrator import RulesOrchestrator from shared.clients.sales_client import SalesServiceClient from shared.clients.inventory_client import InventoryServiceClient from app.core.config import settings # Initialize orchestrator and clients orchestrator = RulesOrchestrator() inventory_client = InventoryServiceClient(settings) # Get products to analyze from inventory service via API if request_data.product_ids: # Fetch specific products products = [] for product_id in request_data.product_ids: product = await inventory_client.get_ingredient_by_id( ingredient_id=UUID(product_id), tenant_id=tenant_id ) if product: products.append(product) else: # Fetch all products for tenant (limit to 10) all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) products = all_products[:10] # Limit to prevent timeout if not products: return RulesGenerationResponse( success=False, message="No products found for analysis", tenant_id=tenant_id, products_analyzed=0, total_insights_generated=0, total_insights_posted=0, insights_by_product={}, errors=["No products found"] ) # Initialize sales client to fetch historical data sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") # Calculate date range end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each product total_insights_generated = 0 total_insights_posted = 0 insights_by_product = {} errors = [] for product in products: try: product_id = str(product['id']) product_name = product.get('name', 'Unknown') logger.info(f"Analyzing product {product_name} ({product_id})") # Fetch sales data for product sales_data = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) if not sales_data: logger.warning(f"No sales data for product {product_id}") continue # Convert to DataFrame sales_df = pd.DataFrame(sales_data) if len(sales_df) < request_data.min_samples: logger.warning( f"Insufficient data for product {product_id}: " f"{len(sales_df)} samples < {request_data.min_samples} required" ) continue # Check what columns are available and map to expected format logger.debug(f"Sales data columns for product {product_id}: {sales_df.columns.tolist()}") # Map common field names to 'quantity' and 'date' if 'quantity' not in sales_df.columns: if 'total_quantity' in sales_df.columns: sales_df['quantity'] = sales_df['total_quantity'] elif 'amount' in sales_df.columns: sales_df['quantity'] = sales_df['amount'] else: logger.warning(f"No quantity field found for product {product_id}, skipping") continue if 'date' not in sales_df.columns: if 'sale_date' in sales_df.columns: sales_df['date'] = sales_df['sale_date'] else: logger.warning(f"No date field found for product {product_id}, skipping") continue # Prepare sales data with required columns sales_df['date'] = pd.to_datetime(sales_df['date']) sales_df['quantity'] = sales_df['quantity'].astype(float) sales_df['day_of_week'] = sales_df['date'].dt.dayofweek sales_df['is_holiday'] = False # TODO: Add holiday detection sales_df['weather'] = 'unknown' # TODO: Add weather data # Run rules learning results = await orchestrator.learn_and_post_rules( tenant_id=tenant_id, inventory_product_id=product_id, sales_data=sales_df, external_data=None, min_samples=request_data.min_samples ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] insights_by_product[product_id] = { 'product_name': product_name, 'insights_posted': results['insights_posted'], 'rules_learned': len(results['rules']) } logger.info( f"Product {product_id} analysis complete", insights_posted=results['insights_posted'] ) except Exception as e: error_msg = f"Error analyzing product {product_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = RulesGenerationResponse( success=total_insights_posted > 0, message=f"Successfully generated {total_insights_posted} insights from {len(products)} products", tenant_id=tenant_id, products_analyzed=len(products), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, insights_by_product=insights_by_product, errors=errors ) logger.info( "ML insights rules generation complete", tenant_id=tenant_id, total_insights=total_insights_posted ) return response except Exception as e: logger.error( "ML insights rules generation failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Rules generation failed: {str(e)}" ) @router.get("/health") async def ml_insights_health(): """Health check for ML insights endpoints""" return { "status": "healthy", "service": "forecasting-ml-insights", "endpoints": [ "POST /ml/insights/generate-rules" ] }