Files
bakery-ia/services/procurement/app/api/ml_insights.py
2025-12-15 21:14:22 +01:00

630 lines
22 KiB
Python

"""
ML Insights API Endpoints for Procurement Service
Provides endpoints to trigger ML insight generation for:
- Supplier performance analysis
- Price forecasting and timing recommendations
"""
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel, Field
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
import structlog
import pandas as pd
from app.core.database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/tenants/{tenant_id}/procurement/ml/insights",
tags=["ML Insights"]
)
# ================================================================
# REQUEST/RESPONSE SCHEMAS - SUPPLIER ANALYSIS
# ================================================================
class SupplierAnalysisRequest(BaseModel):
"""Request schema for supplier performance analysis"""
supplier_ids: Optional[List[str]] = Field(
None,
description="Specific supplier IDs to analyze. If None, analyzes all suppliers"
)
lookback_days: int = Field(
180,
description="Days of historical orders to analyze",
ge=30,
le=730
)
min_orders: int = Field(
10,
description="Minimum orders required for analysis",
ge=5,
le=100
)
class SupplierAnalysisResponse(BaseModel):
"""Response schema for supplier performance analysis"""
success: bool
message: str
tenant_id: str
suppliers_analyzed: int
total_insights_generated: int
total_insights_posted: int
high_risk_suppliers: int
insights_by_supplier: dict
errors: List[str] = []
# ================================================================
# REQUEST/RESPONSE SCHEMAS - PRICE FORECASTING
# ================================================================
class PriceForecastRequest(BaseModel):
"""Request schema for price forecasting"""
ingredient_ids: Optional[List[str]] = Field(
None,
description="Specific ingredient IDs to forecast. If None, forecasts all ingredients"
)
lookback_days: int = Field(
180,
description="Days of historical price data to analyze",
ge=90,
le=730
)
forecast_horizon_days: int = Field(
30,
description="Days to forecast ahead",
ge=7,
le=90
)
class PriceForecastResponse(BaseModel):
"""Response schema for price forecasting"""
success: bool
message: str
tenant_id: str
ingredients_forecasted: int
total_insights_generated: int
total_insights_posted: int
buy_now_recommendations: int
bulk_opportunities: int
insights_by_ingredient: dict
errors: List[str] = []
# ================================================================
# API ENDPOINTS - SUPPLIER ANALYSIS
# ================================================================
@router.post("/analyze-suppliers", response_model=SupplierAnalysisResponse)
async def trigger_supplier_analysis(
tenant_id: str,
request_data: SupplierAnalysisRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Trigger supplier performance analysis.
This endpoint:
1. Fetches historical purchase order data for specified suppliers
2. Runs the SupplierInsightsOrchestrator to analyze reliability
3. Generates insights about supplier performance and risk
4. Posts insights to AI Insights Service
Args:
tenant_id: Tenant UUID
request_data: Analysis parameters
db: Database session
Returns:
SupplierAnalysisResponse with analysis results
"""
logger.info(
"ML insights supplier analysis requested",
tenant_id=tenant_id,
supplier_ids=request_data.supplier_ids,
lookback_days=request_data.lookback_days
)
try:
# Import ML orchestrator and clients
from app.ml.supplier_insights_orchestrator import SupplierInsightsOrchestrator
from app.models.purchase_order import PurchaseOrder
from shared.clients.suppliers_client import SuppliersServiceClient
from app.core.config import settings
from sqlalchemy import select
# Get event publisher from app state
event_publisher = getattr(request.app.state, 'event_publisher', None)
# Initialize orchestrator and clients
orchestrator = SupplierInsightsOrchestrator(event_publisher=event_publisher)
suppliers_client = SuppliersServiceClient(settings)
# Get suppliers to analyze from suppliers service via API
if request_data.supplier_ids:
# Fetch specific suppliers
suppliers = []
for supplier_id in request_data.supplier_ids:
supplier = await suppliers_client.get_supplier_by_id(
tenant_id=tenant_id,
supplier_id=supplier_id
)
if supplier:
suppliers.append(supplier)
else:
# Fetch all active suppliers (limit to 10)
all_suppliers = await suppliers_client.get_all_suppliers(
tenant_id=tenant_id,
is_active=True
)
suppliers = (all_suppliers or [])[:10] # Limit to prevent timeout
if not suppliers:
return SupplierAnalysisResponse(
success=False,
message="No suppliers found for analysis",
tenant_id=tenant_id,
suppliers_analyzed=0,
total_insights_generated=0,
total_insights_posted=0,
high_risk_suppliers=0,
insights_by_supplier={},
errors=["No suppliers found"]
)
# Calculate date range for order history
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=request_data.lookback_days)
# Process each supplier
total_insights_generated = 0
total_insights_posted = 0
high_risk_suppliers = 0
insights_by_supplier = {}
errors = []
for supplier in suppliers:
try:
supplier_id = str(supplier['id'])
supplier_name = supplier.get('name', 'Unknown')
logger.info(f"Analyzing supplier {supplier_name} ({supplier_id})")
# Get purchase orders for this supplier from local database
po_query = select(PurchaseOrder).where(
PurchaseOrder.tenant_id == UUID(tenant_id),
PurchaseOrder.supplier_id == UUID(supplier_id),
PurchaseOrder.order_date >= start_date,
PurchaseOrder.order_date <= end_date
)
po_result = await db.execute(po_query)
purchase_orders = po_result.scalars().all()
if len(purchase_orders) < request_data.min_orders:
logger.warning(
f"Insufficient orders for supplier {supplier_id}: "
f"{len(purchase_orders)} < {request_data.min_orders} required"
)
continue
# Create order history DataFrame
order_data = []
for po in purchase_orders:
# Calculate delivery performance
if po.delivery_date and po.expected_delivery_date:
days_late = (po.delivery_date - po.expected_delivery_date).days
on_time = days_late <= 0
else:
days_late = 0
on_time = True
# Calculate quality score (based on status)
quality_score = 100 if po.status == 'completed' else 80
order_data.append({
'order_date': po.order_date,
'expected_delivery_date': po.expected_delivery_date,
'delivery_date': po.delivery_date,
'days_late': days_late,
'on_time': on_time,
'quality_score': quality_score,
'total_amount': float(po.total_amount) if po.total_amount else 0
})
order_history = pd.DataFrame(order_data)
# Run supplier analysis
results = await orchestrator.analyze_and_post_supplier_insights(
tenant_id=tenant_id,
supplier_id=supplier_id,
order_history=order_history,
min_orders=request_data.min_orders
)
# Track results
total_insights_generated += results['insights_generated']
total_insights_posted += results['insights_posted']
reliability_score = results.get('reliability_score', 100)
if reliability_score < 70:
high_risk_suppliers += 1
insights_by_supplier[supplier_id] = {
'supplier_name': supplier_name,
'insights_posted': results['insights_posted'],
'reliability_score': reliability_score,
'orders_analyzed': results['orders_analyzed']
}
logger.info(
f"Supplier {supplier_id} analysis complete",
insights_posted=results['insights_posted'],
reliability_score=reliability_score
)
except Exception as e:
error_msg = f"Error analyzing supplier {supplier_id}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
# Close orchestrator
await orchestrator.close()
# Build response
response = SupplierAnalysisResponse(
success=total_insights_posted > 0,
message=f"Successfully analyzed {len(insights_by_supplier)} suppliers, generated {total_insights_posted} insights",
tenant_id=tenant_id,
suppliers_analyzed=len(insights_by_supplier),
total_insights_generated=total_insights_generated,
total_insights_posted=total_insights_posted,
high_risk_suppliers=high_risk_suppliers,
insights_by_supplier=insights_by_supplier,
errors=errors
)
logger.info(
"ML insights supplier analysis complete",
tenant_id=tenant_id,
total_insights=total_insights_posted,
high_risk_suppliers=high_risk_suppliers
)
return response
except Exception as e:
logger.error(
"ML insights supplier analysis failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Supplier analysis failed: {str(e)}"
)
# ================================================================
# API ENDPOINTS - PRICE FORECASTING
# ================================================================
@router.post("/forecast-prices", response_model=PriceForecastResponse)
async def trigger_price_forecasting(
tenant_id: str,
request_data: PriceForecastRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Trigger price forecasting for procurement ingredients.
This endpoint:
1. Fetches historical price data for specified ingredients
2. Runs the PriceInsightsOrchestrator to forecast future prices
3. Generates insights about optimal purchase timing
4. Posts insights to AI Insights Service
Args:
tenant_id: Tenant UUID
request_data: Forecasting parameters
db: Database session
Returns:
PriceForecastResponse with forecasting results
"""
logger.info(
"ML insights price forecasting requested",
tenant_id=tenant_id,
ingredient_ids=request_data.ingredient_ids,
lookback_days=request_data.lookback_days
)
try:
# Import ML orchestrator and clients
from app.ml.price_insights_orchestrator import PriceInsightsOrchestrator
from shared.clients.inventory_client import InventoryServiceClient
from app.models.purchase_order import PurchaseOrderItem
from app.core.config import settings
from sqlalchemy import select
# Get event publisher from app state
event_publisher = getattr(request.app.state, 'event_publisher', None)
# Initialize orchestrator and inventory client
orchestrator = PriceInsightsOrchestrator(event_publisher=event_publisher)
inventory_client = InventoryServiceClient(settings)
# Get ingredients to forecast from inventory service via API
if request_data.ingredient_ids:
# Fetch specific ingredients
ingredients = []
for ingredient_id in request_data.ingredient_ids:
ingredient = await inventory_client.get_ingredient_by_id(
ingredient_id=ingredient_id,
tenant_id=tenant_id
)
if ingredient:
ingredients.append(ingredient)
else:
# Fetch all ingredients for tenant (limit to 10)
all_ingredients = await inventory_client.get_all_ingredients(tenant_id=tenant_id)
ingredients = all_ingredients[:10] if all_ingredients else [] # Limit to prevent timeout
if not ingredients:
return PriceForecastResponse(
success=False,
message="No ingredients found for forecasting",
tenant_id=tenant_id,
ingredients_forecasted=0,
total_insights_generated=0,
total_insights_posted=0,
buy_now_recommendations=0,
bulk_opportunities=0,
insights_by_ingredient={},
errors=["No ingredients found"]
)
# Calculate date range for price history
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=request_data.lookback_days)
# Process each ingredient
total_insights_generated = 0
total_insights_posted = 0
buy_now_recommendations = 0
bulk_opportunities = 0
insights_by_ingredient = {}
errors = []
for ingredient in ingredients:
try:
ingredient_id = str(ingredient['id'])
ingredient_name = ingredient.get('name', 'Unknown Ingredient')
logger.info(f"Forecasting prices for {ingredient_name} ({ingredient_id})")
# Get price history from purchase order items
poi_query = select(PurchaseOrderItem).where(
PurchaseOrderItem.inventory_product_id == UUID(ingredient_id)
).join(
PurchaseOrderItem.purchase_order
).where(
PurchaseOrderItem.purchase_order.has(
tenant_id=UUID(tenant_id)
)
)
poi_result = await db.execute(poi_query)
purchase_items = poi_result.scalars().all()
if len(purchase_items) < 30:
logger.warning(
f"Insufficient price history for ingredient {ingredient_id}: "
f"{len(purchase_items)} items"
)
continue
# Create price history DataFrame
price_data = []
for item in purchase_items:
if item.unit_price and item.quantity:
price_data.append({
'date': item.purchase_order.order_date,
'price': float(item.unit_price),
'quantity': float(item.quantity),
'supplier_id': str(item.purchase_order.supplier_id)
})
price_history = pd.DataFrame(price_data)
price_history = price_history.sort_values('date')
# Run price forecasting
results = await orchestrator.forecast_and_post_insights(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
price_history=price_history,
forecast_horizon_days=request_data.forecast_horizon_days,
min_history_days=request_data.lookback_days
)
# Track results
total_insights_generated += results['insights_generated']
total_insights_posted += results['insights_posted']
recommendation = results.get('recommendation', {})
if recommendation.get('action') == 'buy_now':
buy_now_recommendations += 1
bulk_opp = results.get('bulk_opportunity', {})
if bulk_opp.get('has_bulk_opportunity'):
bulk_opportunities += 1
insights_by_ingredient[ingredient_id] = {
'ingredient_name': ingredient_name,
'insights_posted': results['insights_posted'],
'recommendation': recommendation.get('action'),
'has_bulk_opportunity': bulk_opp.get('has_bulk_opportunity', False)
}
logger.info(
f"Ingredient {ingredient_id} forecasting complete",
insights_posted=results['insights_posted'],
recommendation=recommendation.get('action')
)
except Exception as e:
error_msg = f"Error forecasting ingredient {ingredient_id}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
# Close orchestrator
await orchestrator.close()
# Build response
response = PriceForecastResponse(
success=total_insights_posted > 0,
message=f"Successfully forecasted {len(insights_by_ingredient)} ingredients, generated {total_insights_posted} insights",
tenant_id=tenant_id,
ingredients_forecasted=len(insights_by_ingredient),
total_insights_generated=total_insights_generated,
total_insights_posted=total_insights_posted,
buy_now_recommendations=buy_now_recommendations,
bulk_opportunities=bulk_opportunities,
insights_by_ingredient=insights_by_ingredient,
errors=errors
)
logger.info(
"ML insights price forecasting complete",
tenant_id=tenant_id,
total_insights=total_insights_posted,
buy_now_recommendations=buy_now_recommendations,
bulk_opportunities=bulk_opportunities
)
return response
except Exception as e:
logger.error(
"ML insights price forecasting failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Price forecasting failed: {str(e)}"
)
@router.get("/health")
async def ml_insights_health():
"""Health check for ML insights endpoints"""
return {
"status": "healthy",
"service": "procurement-ml-insights",
"endpoints": [
"POST /ml/insights/analyze-suppliers",
"POST /ml/insights/forecast-prices",
"POST /internal/ml/generate-price-insights"
]
}
# ================================================================
# INTERNAL API ENDPOINT - Called by demo session service
# ================================================================
from fastapi import Request
# Create a separate router for internal endpoints to avoid the tenant prefix
internal_router = APIRouter(
tags=["ML Insights - Internal"]
)
@internal_router.post("/api/v1/tenants/{tenant_id}/procurement/internal/ml/generate-price-insights")
async def generate_price_insights_internal(
tenant_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Internal endpoint to trigger price insights generation for demo sessions.
This endpoint is called by the demo-session service after cloning data.
It uses the same ML logic as the public endpoint but with optimized defaults.
Security: Protected by X-Internal-Service header check.
Args:
tenant_id: The tenant UUID
request: FastAPI request object
db: Database session
Returns:
{
"insights_posted": int,
"tenant_id": str,
"status": str
}
"""
# Verify internal service header
if not request or request.headers.get("X-Internal-Service") not in ["demo-session", "internal"]:
logger.warning("Unauthorized internal API call", tenant_id=tenant_id)
raise HTTPException(
status_code=403,
detail="This endpoint is for internal service use only"
)
logger.info("Internal price insights generation triggered", tenant_id=tenant_id)
try:
# Use the existing price forecasting logic with sensible defaults
request_data = PriceForecastRequest(
ingredient_ids=None, # Analyze all ingredients
lookback_days=180, # 6 months of history
forecast_horizon_days=30 # Forecast 30 days ahead
)
# Call the existing price forecasting endpoint logic
result = await trigger_price_forecasting(
tenant_id=tenant_id,
request_data=request_data,
request=request,
db=db
)
# Return simplified response for internal use
return {
"insights_posted": result.total_insights_posted,
"tenant_id": tenant_id,
"status": "success" if result.success else "failed",
"message": result.message,
"ingredients_analyzed": result.ingredients_forecasted,
"buy_now_recommendations": result.buy_now_recommendations
}
except Exception as e:
logger.error(
"Internal price insights generation failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Internal price insights generation failed: {str(e)}"
)