""" ML Insights API Endpoints for Forecasting Service Provides endpoints to trigger ML insight generation for: - Dynamic business rules learning - Demand pattern analysis - Seasonal trend detection """ from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request from pydantic import BaseModel, Field from typing import Optional, List from uuid import UUID from datetime import datetime, timedelta import structlog import pandas as pd from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession logger = structlog.get_logger() router = APIRouter( prefix="/api/v1/tenants/{tenant_id}/forecasting/ml/insights", tags=["ML Insights"] ) # ================================================================ # REQUEST/RESPONSE SCHEMAS # ================================================================ class RulesGenerationRequest(BaseModel): """Request schema for rules generation""" product_ids: Optional[List[str]] = Field( None, description="Specific product IDs to analyze. If None, analyzes all products" ) lookback_days: int = Field( 90, description="Days of historical data to analyze", ge=30, le=365 ) min_samples: int = Field( 10, description="Minimum samples required for rule learning", ge=5, le=100 ) class RulesGenerationResponse(BaseModel): """Response schema for rules generation""" success: bool message: str tenant_id: str products_analyzed: int total_insights_generated: int total_insights_posted: int insights_by_product: dict errors: List[str] = [] class DemandAnalysisRequest(BaseModel): """Request schema for demand analysis""" product_ids: Optional[List[str]] = Field( None, description="Specific product IDs to analyze. If None, analyzes all products" ) lookback_days: int = Field( 90, description="Days of historical data to analyze", ge=30, le=365 ) forecast_horizon_days: int = Field( 30, description="Days to forecast ahead", ge=7, le=90 ) class DemandAnalysisResponse(BaseModel): """Response schema for demand analysis""" success: bool message: str tenant_id: str products_analyzed: int total_insights_generated: int total_insights_posted: int insights_by_product: dict errors: List[str] = [] class BusinessRulesAnalysisRequest(BaseModel): """Request schema for business rules analysis""" product_ids: Optional[List[str]] = Field( None, description="Specific product IDs to analyze. If None, analyzes all products" ) lookback_days: int = Field( 90, description="Days of historical data to analyze", ge=30, le=365 ) min_samples: int = Field( 10, description="Minimum samples required for rule analysis", ge=5, le=100 ) class BusinessRulesAnalysisResponse(BaseModel): """Response schema for business rules analysis""" success: bool message: str tenant_id: str products_analyzed: int total_insights_generated: int total_insights_posted: int insights_by_product: dict errors: List[str] = [] # ================================================================ # API ENDPOINTS # ================================================================ @router.post("/generate-rules", response_model=RulesGenerationResponse) async def trigger_rules_generation( tenant_id: str, request_data: RulesGenerationRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Trigger dynamic business rules learning from historical sales data. This endpoint: 1. Fetches historical sales data for specified products 2. Runs the RulesOrchestrator to learn patterns 3. Generates insights about optimal business rules 4. Posts insights to AI Insights Service Args: tenant_id: Tenant UUID request_data: Rules generation parameters db: Database session Returns: RulesGenerationResponse with generation results """ logger.info( "ML insights rules generation requested", tenant_id=tenant_id, product_ids=request_data.product_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.rules_orchestrator import RulesOrchestrator from shared.clients.sales_client import SalesServiceClient from shared.clients.inventory_client import InventoryServiceClient from app.core.config import settings # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and clients orchestrator = RulesOrchestrator(event_publisher=event_publisher) inventory_client = InventoryServiceClient(settings) # Get products to analyze from inventory service via API if request_data.product_ids: # Fetch specific products products = [] for product_id in request_data.product_ids: product = await inventory_client.get_ingredient_by_id( ingredient_id=UUID(product_id), tenant_id=tenant_id ) if product: products.append(product) else: # Fetch all products for tenant (limit to 10) all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) products = all_products[:10] # Limit to prevent timeout if not products: return RulesGenerationResponse( success=False, message="No products found for analysis", tenant_id=tenant_id, products_analyzed=0, total_insights_generated=0, total_insights_posted=0, insights_by_product={}, errors=["No products found"] ) # Initialize sales client to fetch historical data sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") # Calculate date range end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each product total_insights_generated = 0 total_insights_posted = 0 insights_by_product = {} errors = [] for product in products: try: product_id = str(product['id']) product_name = product.get('name', 'Unknown') logger.info(f"Analyzing product {product_name} ({product_id})") # Fetch sales data for product sales_data = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) if not sales_data: logger.warning(f"No sales data for product {product_id}") continue # Convert to DataFrame sales_df = pd.DataFrame(sales_data) if len(sales_df) < request_data.min_samples: logger.warning( f"Insufficient data for product {product_id}: " f"{len(sales_df)} samples < {request_data.min_samples} required" ) continue # Check what columns are available and map to expected format logger.debug(f"Sales data columns for product {product_id}: {sales_df.columns.tolist()}") # Map common field names to 'quantity' and 'date' if 'quantity' not in sales_df.columns: if 'total_quantity' in sales_df.columns: sales_df['quantity'] = sales_df['total_quantity'] elif 'amount' in sales_df.columns: sales_df['quantity'] = sales_df['amount'] else: logger.warning(f"No quantity field found for product {product_id}, skipping") continue if 'date' not in sales_df.columns: if 'sale_date' in sales_df.columns: sales_df['date'] = sales_df['sale_date'] else: logger.warning(f"No date field found for product {product_id}, skipping") continue # Prepare sales data with required columns sales_df['date'] = pd.to_datetime(sales_df['date']) sales_df['quantity'] = sales_df['quantity'].astype(float) sales_df['day_of_week'] = sales_df['date'].dt.dayofweek # NOTE: Holiday detection for historical data requires: # 1. Tenant location context (calendar_id) # 2. Bulk holiday check API (currently single-date only) # 3. Historical calendar data # For real-time forecasts, holiday detection IS implemented via data_client.py sales_df['is_holiday'] = False # NOTE: Weather data for historical analysis requires: # 1. Historical weather API integration # 2. Tenant location coordinates # For real-time forecasts, weather data IS fetched via external service sales_df['weather'] = 'unknown' # Run rules learning results = await orchestrator.learn_and_post_rules( tenant_id=tenant_id, inventory_product_id=product_id, sales_data=sales_df, external_data=None, min_samples=request_data.min_samples ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] insights_by_product[product_id] = { 'product_name': product_name, 'insights_posted': results['insights_posted'], 'rules_learned': len(results['rules']) } logger.info( f"Product {product_id} analysis complete", insights_posted=results['insights_posted'] ) except Exception as e: error_msg = f"Error analyzing product {product_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = RulesGenerationResponse( success=total_insights_posted > 0, message=f"Successfully generated {total_insights_posted} insights from {len(products)} products", tenant_id=tenant_id, products_analyzed=len(products), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, insights_by_product=insights_by_product, errors=errors ) logger.info( "ML insights rules generation complete", tenant_id=tenant_id, total_insights=total_insights_posted ) return response except Exception as e: logger.error( "ML insights rules generation failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Rules generation failed: {str(e)}" ) @router.post("/analyze-demand", response_model=DemandAnalysisResponse) async def trigger_demand_analysis( tenant_id: str, request_data: DemandAnalysisRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Trigger demand pattern analysis from historical sales data. This endpoint: 1. Fetches historical sales data for specified products 2. Runs the DemandInsightsOrchestrator to analyze patterns 3. Generates insights about demand forecasting optimization 4. Posts insights to AI Insights Service 5. Publishes events to RabbitMQ Args: tenant_id: Tenant UUID request_data: Demand analysis parameters request: FastAPI request object to access app state db: Database session Returns: DemandAnalysisResponse with analysis results """ logger.info( "ML insights demand analysis requested", tenant_id=tenant_id, product_ids=request_data.product_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.demand_insights_orchestrator import DemandInsightsOrchestrator from shared.clients.sales_client import SalesServiceClient from shared.clients.inventory_client import InventoryServiceClient from app.core.config import settings # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and clients orchestrator = DemandInsightsOrchestrator(event_publisher=event_publisher) inventory_client = InventoryServiceClient(settings) # Get products to analyze from inventory service via API if request_data.product_ids: # Fetch specific products products = [] for product_id in request_data.product_ids: product = await inventory_client.get_ingredient_by_id( ingredient_id=UUID(product_id), tenant_id=tenant_id ) if product: products.append(product) else: # Fetch all products for tenant (limit to 10) all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) products = all_products[:10] # Limit to prevent timeout if not products: return DemandAnalysisResponse( success=False, message="No products found for analysis", tenant_id=tenant_id, products_analyzed=0, total_insights_generated=0, total_insights_posted=0, insights_by_product={}, errors=["No products found"] ) # Initialize sales client to fetch historical data sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") # Calculate date range end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each product total_insights_generated = 0 total_insights_posted = 0 insights_by_product = {} errors = [] for product in products: try: product_id = str(product['id']) product_name = product.get('name', 'Unknown') logger.info(f"Analyzing product {product_name} ({product_id})") # Fetch sales data for product sales_data = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) if not sales_data: logger.warning(f"No sales data for product {product_id}") continue # Convert to DataFrame sales_df = pd.DataFrame(sales_data) if len(sales_df) < 30: # Minimum for demand analysis logger.warning( f"Insufficient data for product {product_id}: " f"{len(sales_df)} samples < 30 required" ) continue # Check what columns are available and map to expected format logger.debug(f"Sales data columns for product {product_id}: {sales_df.columns.tolist()}") # Map common field names to 'quantity' and 'date' if 'quantity' not in sales_df.columns: if 'total_quantity' in sales_df.columns: sales_df['quantity'] = sales_df['total_quantity'] elif 'amount' in sales_df.columns: sales_df['quantity'] = sales_df['amount'] else: logger.warning(f"No quantity field found for product {product_id}, skipping") continue if 'date' not in sales_df.columns: if 'sale_date' in sales_df.columns: sales_df['date'] = sales_df['sale_date'] else: logger.warning(f"No date field found for product {product_id}, skipping") continue # Prepare sales data with required columns sales_df['date'] = pd.to_datetime(sales_df['date']) sales_df['quantity'] = sales_df['quantity'].astype(float) sales_df['day_of_week'] = sales_df['date'].dt.dayofweek # Run demand analysis results = await orchestrator.analyze_and_post_demand_insights( tenant_id=tenant_id, inventory_product_id=product_id, sales_data=sales_df, forecast_horizon_days=request_data.forecast_horizon_days, min_history_days=request_data.lookback_days ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] insights_by_product[product_id] = { 'product_name': product_name, 'insights_posted': results['insights_posted'], 'trend_analysis': results.get('trend_analysis', {}) } logger.info( f"Product {product_id} demand analysis complete", insights_posted=results['insights_posted'] ) except Exception as e: error_msg = f"Error analyzing product {product_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = DemandAnalysisResponse( success=total_insights_posted > 0, message=f"Successfully generated {total_insights_posted} insights from {len(products)} products", tenant_id=tenant_id, products_analyzed=len(products), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, insights_by_product=insights_by_product, errors=errors ) logger.info( "ML insights demand analysis complete", tenant_id=tenant_id, total_insights=total_insights_posted ) return response except Exception as e: logger.error( "ML insights demand analysis failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Demand analysis failed: {str(e)}" ) @router.post("/analyze-business-rules", response_model=BusinessRulesAnalysisResponse) async def trigger_business_rules_analysis( tenant_id: str, request_data: BusinessRulesAnalysisRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Trigger business rules optimization analysis from historical sales data. This endpoint: 1. Fetches historical sales data for specified products 2. Runs the BusinessRulesInsightsOrchestrator to analyze rules 3. Generates insights about business rule optimization 4. Posts insights to AI Insights Service 5. Publishes events to RabbitMQ Args: tenant_id: Tenant UUID request_data: Business rules analysis parameters request: FastAPI request object to access app state db: Database session Returns: BusinessRulesAnalysisResponse with analysis results """ logger.info( "ML insights business rules analysis requested", tenant_id=tenant_id, product_ids=request_data.product_ids, lookback_days=request_data.lookback_days ) try: # Import ML orchestrator and clients from app.ml.business_rules_insights_orchestrator import BusinessRulesInsightsOrchestrator from shared.clients.sales_client import SalesServiceClient from shared.clients.inventory_client import InventoryServiceClient from app.core.config import settings # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and clients orchestrator = BusinessRulesInsightsOrchestrator(event_publisher=event_publisher) inventory_client = InventoryServiceClient(settings) # Get products to analyze from inventory service via API if request_data.product_ids: # Fetch specific products products = [] for product_id in request_data.product_ids: product = await inventory_client.get_ingredient_by_id( ingredient_id=UUID(product_id), tenant_id=tenant_id ) if product: products.append(product) else: # Fetch all products for tenant (limit to 10) all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) products = all_products[:10] # Limit to prevent timeout if not products: return BusinessRulesAnalysisResponse( success=False, message="No products found for analysis", tenant_id=tenant_id, products_analyzed=0, total_insights_generated=0, total_insights_posted=0, insights_by_product={}, errors=["No products found"] ) # Initialize sales client to fetch historical data sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") # Calculate date range end_date = datetime.utcnow() start_date = end_date - timedelta(days=request_data.lookback_days) # Process each product total_insights_generated = 0 total_insights_posted = 0 insights_by_product = {} errors = [] for product in products: try: product_id = str(product['id']) product_name = product.get('name', 'Unknown') logger.info(f"Analyzing product {product_name} ({product_id})") # Fetch sales data for product sales_data = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) if not sales_data: logger.warning(f"No sales data for product {product_id}") continue # Convert to DataFrame sales_df = pd.DataFrame(sales_data) if len(sales_df) < request_data.min_samples: logger.warning( f"Insufficient data for product {product_id}: " f"{len(sales_df)} samples < {request_data.min_samples} required" ) continue # Check what columns are available and map to expected format logger.debug(f"Sales data columns for product {product_id}: {sales_df.columns.tolist()}") # Map common field names to 'quantity' and 'date' if 'quantity' not in sales_df.columns: if 'total_quantity' in sales_df.columns: sales_df['quantity'] = sales_df['total_quantity'] elif 'amount' in sales_df.columns: sales_df['quantity'] = sales_df['amount'] else: logger.warning(f"No quantity field found for product {product_id}, skipping") continue if 'date' not in sales_df.columns: if 'sale_date' in sales_df.columns: sales_df['date'] = sales_df['sale_date'] else: logger.warning(f"No date field found for product {product_id}, skipping") continue # Prepare sales data with required columns sales_df['date'] = pd.to_datetime(sales_df['date']) sales_df['quantity'] = sales_df['quantity'].astype(float) sales_df['day_of_week'] = sales_df['date'].dt.dayofweek # Run business rules analysis results = await orchestrator.analyze_and_post_business_rules_insights( tenant_id=tenant_id, inventory_product_id=product_id, sales_data=sales_df, min_samples=request_data.min_samples ) # Track results total_insights_generated += results['insights_generated'] total_insights_posted += results['insights_posted'] insights_by_product[product_id] = { 'product_name': product_name, 'insights_posted': results['insights_posted'], 'rules_learned': len(results.get('rules', {})) } logger.info( f"Product {product_id} business rules analysis complete", insights_posted=results['insights_posted'] ) except Exception as e: error_msg = f"Error analyzing product {product_id}: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # Close orchestrator await orchestrator.close() # Build response response = BusinessRulesAnalysisResponse( success=total_insights_posted > 0, message=f"Successfully generated {total_insights_posted} insights from {len(products)} products", tenant_id=tenant_id, products_analyzed=len(products), total_insights_generated=total_insights_generated, total_insights_posted=total_insights_posted, insights_by_product=insights_by_product, errors=errors ) logger.info( "ML insights business rules analysis complete", tenant_id=tenant_id, total_insights=total_insights_posted ) return response except Exception as e: logger.error( "ML insights business rules analysis failed", tenant_id=tenant_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Business rules analysis failed: {str(e)}" ) @router.get("/health") async def ml_insights_health(): """Health check for ML insights endpoints""" return { "status": "healthy", "service": "forecasting-ml-insights", "endpoints": [ "POST /ml/insights/generate-rules", "POST /ml/insights/analyze-demand", "POST /ml/insights/analyze-business-rules" ] } # ================================================================ # INTERNAL ML INSIGHTS ENDPOINTS (for demo session service) # ================================================================ internal_router = APIRouter(tags=["Internal ML"]) @internal_router.post("/api/v1/tenants/{tenant_id}/forecasting/internal/ml/generate-demand-insights") async def trigger_demand_insights_internal( tenant_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ Internal endpoint to trigger demand forecasting insights for a tenant. This endpoint is called by the demo-session service after cloning to generate AI insights from the seeded forecast data. Args: tenant_id: Tenant UUID request: FastAPI request object to access app state db: Database session Returns: Dict with insights generation results """ logger.info( "Internal demand insights generation triggered", tenant_id=tenant_id ) try: # Import ML orchestrator and clients from app.ml.demand_insights_orchestrator import DemandInsightsOrchestrator from shared.clients.sales_client import SalesServiceClient from shared.clients.inventory_client import InventoryServiceClient from app.core.config import settings # Get event publisher from app state event_publisher = getattr(request.app.state, 'event_publisher', None) # Initialize orchestrator and clients orchestrator = DemandInsightsOrchestrator(event_publisher=event_publisher) inventory_client = InventoryServiceClient(settings) # Get all products for tenant (limit to 10 for performance) all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) products = all_products[:10] if all_products else [] logger.info( "Retrieved products from inventory service", tenant_id=tenant_id, product_count=len(products) ) if not products: return { "success": False, "message": "No products found for analysis", "tenant_id": tenant_id, "products_analyzed": 0, "insights_posted": 0 } # Initialize sales client sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") # Calculate date range (90 days lookback) end_date = datetime.utcnow() start_date = end_date - timedelta(days=90) # Process each product total_insights_generated = 0 total_insights_posted = 0 for product in products: try: product_id = str(product['id']) product_name = product.get('name', 'Unknown Product') logger.debug( "Analyzing demand for product", tenant_id=tenant_id, product_id=product_id, product_name=product_name ) # Fetch historical sales data sales_data_raw = await sales_client.get_product_sales( tenant_id=tenant_id, product_id=product_id, start_date=start_date, end_date=end_date ) if not sales_data_raw or len(sales_data_raw) < 10: logger.debug( "Insufficient sales data for product", product_id=product_id, sales_records=len(sales_data_raw) if sales_data_raw else 0 ) continue # Convert to DataFrame sales_df = pd.DataFrame(sales_data_raw) # Run demand insights orchestrator insights = await orchestrator.analyze_and_generate_insights( tenant_id=tenant_id, product_id=product_id, product_name=product_name, sales_data=sales_df, lookback_days=90, db=db ) if insights: total_insights_generated += len(insights) total_insights_posted += len(insights) logger.info( "Demand insights generated for product", tenant_id=tenant_id, product_id=product_id, insights_count=len(insights) ) except Exception as e: logger.warning( "Failed to analyze product demand (non-fatal)", tenant_id=tenant_id, product_id=product_id, error=str(e) ) continue logger.info( "Internal demand insights generation complete", tenant_id=tenant_id, products_analyzed=len(products), insights_generated=total_insights_generated, insights_posted=total_insights_posted ) return { "success": True, "message": f"Generated {total_insights_posted} demand forecasting insights", "tenant_id": tenant_id, "products_analyzed": len(products), "insights_posted": total_insights_posted } except Exception as e: logger.error( "Internal demand insights generation failed", tenant_id=tenant_id, error=str(e), exc_info=True ) return { "success": False, "message": f"Demand insights generation failed: {str(e)}", "tenant_id": tenant_id, "products_analyzed": 0, "insights_posted": 0 }