Files
bakery-ia/services/inventory/app/api/ml_insights.py
2025-12-14 19:05:37 +01:00

405 lines
14 KiB
Python

"""
ML Insights API Endpoints for Inventory Service
Provides endpoints to trigger ML insight generation for:
- Safety stock optimization
- Inventory level recommendations
- Demand pattern analysis
"""
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
import structlog
import pandas as pd
from app.core.database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/tenants/{tenant_id}/inventory/ml/insights",
tags=["ML Insights"]
)
# ================================================================
# REQUEST/RESPONSE SCHEMAS
# ================================================================
class SafetyStockOptimizationRequest(BaseModel):
"""Request schema for safety stock optimization"""
product_ids: Optional[List[str]] = Field(
None,
description="Specific product IDs to optimize. If None, optimizes all products"
)
lookback_days: int = Field(
90,
description="Days of historical demand to analyze",
ge=30,
le=365
)
min_history_days: int = Field(
30,
description="Minimum days of history required",
ge=7,
le=180
)
class SafetyStockOptimizationResponse(BaseModel):
"""Response schema for safety stock optimization"""
success: bool
message: str
tenant_id: str
products_optimized: int
total_insights_generated: int
total_insights_posted: int
total_cost_savings: float
insights_by_product: dict
errors: List[str] = []
# ================================================================
# API ENDPOINTS
# ================================================================
@router.post("/optimize-safety-stock", response_model=SafetyStockOptimizationResponse)
async def trigger_safety_stock_optimization(
tenant_id: str,
request_data: SafetyStockOptimizationRequest,
db: AsyncSession = Depends(get_db)
):
"""
Trigger safety stock optimization for inventory products.
This endpoint:
1. Fetches historical demand data for specified products
2. Runs the SafetyStockInsightsOrchestrator to optimize levels
3. Generates insights about safety stock recommendations
4. Posts insights to AI Insights Service
Args:
tenant_id: Tenant UUID
request_data: Optimization parameters
db: Database session
Returns:
SafetyStockOptimizationResponse with optimization results
"""
logger.info(
"ML insights safety stock optimization requested",
tenant_id=tenant_id,
product_ids=request_data.product_ids,
lookback_days=request_data.lookback_days
)
try:
# Import ML orchestrator
from app.ml.safety_stock_insights_orchestrator import SafetyStockInsightsOrchestrator
from app.models.inventory import Ingredient
from sqlalchemy import select
# Initialize orchestrator
orchestrator = SafetyStockInsightsOrchestrator()
# Get products to optimize
if request_data.product_ids:
query = select(Ingredient).where(
Ingredient.tenant_id == UUID(tenant_id),
Ingredient.id.in_([UUID(pid) for pid in request_data.product_ids])
)
else:
query = select(Ingredient).where(
Ingredient.tenant_id == UUID(tenant_id)
).limit(10) # Limit to prevent timeout
result = await db.execute(query)
products = result.scalars().all()
if not products:
return SafetyStockOptimizationResponse(
success=False,
message="No products found for optimization",
tenant_id=tenant_id,
products_optimized=0,
total_insights_generated=0,
total_insights_posted=0,
total_cost_savings=0.0,
insights_by_product={},
errors=["No products found"]
)
# Calculate date range for demand history
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=request_data.lookback_days)
# Process each product
total_insights_generated = 0
total_insights_posted = 0
total_cost_savings = 0.0
insights_by_product = {}
errors = []
for product in products:
try:
product_id = str(product.id)
logger.info(f"Optimizing safety stock for {product.name} ({product_id})")
# Fetch real sales/demand history from sales service
from shared.clients.sales_client import SalesServiceClient
from app.core.config import settings
sales_client = SalesServiceClient(settings)
try:
# Fetch sales data for this product
sales_data = await sales_client.get_sales_data(
tenant_id=tenant_id,
product_id=product_id,
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d')
)
if not sales_data:
logger.warning(
f"No sales history for product {product_id}, skipping"
)
continue
demand_data = []
for sale in sales_data:
demand_data.append({
'date': pd.to_datetime(sale.get('date') or sale.get('sale_date')),
'quantity': float(sale.get('quantity', 0))
})
if not demand_data:
logger.warning(
f"No valid demand data for product {product_id}, skipping"
)
continue
demand_history = pd.DataFrame(demand_data)
# Aggregate by date if there are multiple sales per day
demand_history = demand_history.groupby('date').agg({
'quantity': 'sum'
}).reset_index()
if len(demand_history) < request_data.min_history_days:
logger.warning(
f"Insufficient demand history for product {product_id}: "
f"{len(demand_history)} days < {request_data.min_history_days} required"
)
continue
except Exception as e:
logger.error(
f"Error fetching sales data for product {product_id}: {e}",
exc_info=True
)
continue
# Get lead time from supplier if available
lead_time_days = 7 # Default fallback
if product.supplier_id:
try:
from shared.clients.suppliers_client import SuppliersClient
suppliers_client = SuppliersClient()
supplier_data = await suppliers_client.get_supplier_by_id(
tenant_id=str(tenant_id),
supplier_id=str(product.supplier_id)
)
if supplier_data and 'standard_lead_time' in supplier_data:
lead_time_days = supplier_data['standard_lead_time']
logger.debug(
f"Using supplier lead time for product {product_id}",
lead_time=lead_time_days,
supplier_id=str(product.supplier_id)
)
except Exception as e:
logger.warning(
f"Failed to fetch supplier lead time for product {product_id}, using default",
error=str(e),
supplier_id=str(product.supplier_id)
)
# Product characteristics
product_characteristics = {
'lead_time_days': lead_time_days,
'shelf_life_days': 30 if product.is_perishable else 365,
'perishable': product.is_perishable
}
# Run optimization
results = await orchestrator.optimize_and_post_insights(
tenant_id=tenant_id,
inventory_product_id=product_id,
demand_history=demand_history,
product_characteristics=product_characteristics,
min_history_days=request_data.min_history_days
)
# Track results
total_insights_generated += results['insights_generated']
total_insights_posted += results['insights_posted']
if results.get('cost_savings'):
total_cost_savings += results['cost_savings']
insights_by_product[product_id] = {
'product_name': product.name,
'insights_posted': results['insights_posted'],
'optimal_safety_stock': results.get('optimal_safety_stock'),
'cost_savings': results.get('cost_savings', 0.0)
}
logger.info(
f"Product {product_id} optimization complete",
insights_posted=results['insights_posted'],
cost_savings=results.get('cost_savings', 0)
)
except Exception as e:
error_msg = f"Error optimizing product {product_id}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
# Close orchestrator
await orchestrator.close()
# Build response
response = SafetyStockOptimizationResponse(
success=total_insights_posted > 0,
message=f"Successfully optimized {len(products)} products, generated {total_insights_posted} insights",
tenant_id=tenant_id,
products_optimized=len(products),
total_insights_generated=total_insights_generated,
total_insights_posted=total_insights_posted,
total_cost_savings=round(total_cost_savings, 2),
insights_by_product=insights_by_product,
errors=errors
)
logger.info(
"ML insights safety stock optimization complete",
tenant_id=tenant_id,
total_insights=total_insights_posted,
total_savings=total_cost_savings
)
return response
except Exception as e:
logger.error(
"ML insights safety stock optimization failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Safety stock optimization failed: {str(e)}"
)
@router.get("/health")
async def ml_insights_health():
"""Health check for ML insights endpoints"""
return {
"status": "healthy",
"service": "inventory-ml-insights",
"endpoints": [
"POST /ml/insights/optimize-safety-stock"
]
}
# ================================================================
# INTERNAL ENDPOINTS (for demo-session service)
# ================================================================
from fastapi import Request
# Create a separate router for internal endpoints to avoid the tenant prefix
internal_router = APIRouter(
tags=["ML Insights - Internal"]
)
@internal_router.post("/api/v1/tenants/{tenant_id}/inventory/internal/ml/generate-safety-stock-insights")
async def generate_safety_stock_insights_internal(
tenant_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Internal endpoint to trigger safety stock insights generation for demo sessions.
This endpoint is called by the demo-session service after cloning data.
It uses the same ML logic as the public endpoint but with optimized defaults.
Security: Protected by X-Internal-Service header check.
Args:
tenant_id: The tenant UUID
request: FastAPI request object
db: Database session
Returns:
{
"insights_posted": int,
"tenant_id": str,
"status": str
}
"""
# Verify internal service header
if not request or request.headers.get("X-Internal-Service") not in ["demo-session", "internal"]:
logger.warning("Unauthorized internal API call", tenant_id=tenant_id)
raise HTTPException(
status_code=403,
detail="This endpoint is for internal service use only"
)
logger.info("Internal safety stock insights generation triggered", tenant_id=tenant_id)
try:
# Use the existing safety stock optimization logic with sensible defaults
request_data = SafetyStockOptimizationRequest(
product_ids=None, # Analyze all products
lookback_days=90, # 3 months of history
min_history_days=30 # Minimum 30 days required
)
# Call the existing safety stock optimization endpoint logic
result = await trigger_safety_stock_optimization(
tenant_id=tenant_id,
request_data=request_data,
db=db
)
# Return simplified response for internal use
return {
"insights_posted": result.total_insights_posted,
"tenant_id": tenant_id,
"status": "success" if result.success else "failed",
"message": result.message,
"products_optimized": result.products_optimized,
"total_cost_savings": result.total_cost_savings
}
except Exception as e:
logger.error(
"Internal safety stock insights generation failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Internal safety stock insights generation failed: {str(e)}"
)