Files
bakery-ia/services/inventory/app/api/ml_insights.py

414 lines
15 KiB
Python
Raw Permalink Normal View History

2025-11-05 13:34:56 +01:00
"""
ML Insights API Endpoints for Inventory Service
Provides endpoints to trigger ML insight generation for:
- Safety stock optimization
- Inventory level recommendations
- Demand pattern analysis
"""
2025-12-15 21:14:22 +01:00
from fastapi import APIRouter, Depends, HTTPException, Request
2025-11-05 13:34:56 +01:00
from pydantic import BaseModel, Field
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
import structlog
import pandas as pd
from app.core.database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/tenants/{tenant_id}/inventory/ml/insights",
tags=["ML Insights"]
)
# ================================================================
# REQUEST/RESPONSE SCHEMAS
# ================================================================
class SafetyStockOptimizationRequest(BaseModel):
"""Request schema for safety stock optimization"""
product_ids: Optional[List[str]] = Field(
None,
description="Specific product IDs to optimize. If None, optimizes all products"
)
lookback_days: int = Field(
90,
description="Days of historical demand to analyze",
ge=30,
le=365
)
min_history_days: int = Field(
30,
description="Minimum days of history required",
ge=7,
le=180
)
class SafetyStockOptimizationResponse(BaseModel):
"""Response schema for safety stock optimization"""
success: bool
message: str
tenant_id: str
products_optimized: int
total_insights_generated: int
total_insights_posted: int
total_cost_savings: float
insights_by_product: dict
errors: List[str] = []
# ================================================================
# API ENDPOINTS
# ================================================================
@router.post("/optimize-safety-stock", response_model=SafetyStockOptimizationResponse)
async def trigger_safety_stock_optimization(
tenant_id: str,
request_data: SafetyStockOptimizationRequest,
2025-12-15 21:14:22 +01:00
request: Request,
2025-11-05 13:34:56 +01:00
db: AsyncSession = Depends(get_db)
):
"""
Trigger safety stock optimization for inventory products.
This endpoint:
1. Fetches historical demand data for specified products
2. Runs the SafetyStockInsightsOrchestrator to optimize levels
3. Generates insights about safety stock recommendations
4. Posts insights to AI Insights Service
2025-12-15 21:14:22 +01:00
5. Publishes recommendation events to RabbitMQ
2025-11-05 13:34:56 +01:00
Args:
tenant_id: Tenant UUID
request_data: Optimization parameters
2025-12-15 21:14:22 +01:00
request: FastAPI request (for app state access)
2025-11-05 13:34:56 +01:00
db: Database session
Returns:
SafetyStockOptimizationResponse with optimization results
"""
logger.info(
"ML insights safety stock optimization requested",
tenant_id=tenant_id,
product_ids=request_data.product_ids,
lookback_days=request_data.lookback_days
)
try:
# Import ML orchestrator
from app.ml.safety_stock_insights_orchestrator import SafetyStockInsightsOrchestrator
from app.models.inventory import Ingredient
from sqlalchemy import select
2025-12-15 21:14:22 +01:00
# Get event publisher from app state (if available)
event_publisher = getattr(request.app.state, 'event_publisher', None) if hasattr(request, 'app') else None
2025-11-05 13:34:56 +01:00
# Initialize orchestrator
2025-12-15 21:14:22 +01:00
orchestrator = SafetyStockInsightsOrchestrator(
event_publisher=event_publisher
)
2025-11-05 13:34:56 +01:00
# Get products to optimize
if request_data.product_ids:
query = select(Ingredient).where(
Ingredient.tenant_id == UUID(tenant_id),
Ingredient.id.in_([UUID(pid) for pid in request_data.product_ids])
)
else:
query = select(Ingredient).where(
Ingredient.tenant_id == UUID(tenant_id)
).limit(10) # Limit to prevent timeout
result = await db.execute(query)
products = result.scalars().all()
if not products:
return SafetyStockOptimizationResponse(
success=False,
message="No products found for optimization",
tenant_id=tenant_id,
products_optimized=0,
total_insights_generated=0,
total_insights_posted=0,
total_cost_savings=0.0,
insights_by_product={},
errors=["No products found"]
)
# Calculate date range for demand history
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=request_data.lookback_days)
# Process each product
total_insights_generated = 0
total_insights_posted = 0
total_cost_savings = 0.0
insights_by_product = {}
errors = []
for product in products:
try:
product_id = str(product.id)
logger.info(f"Optimizing safety stock for {product.name} ({product_id})")
# Fetch real sales/demand history from sales service
from shared.clients.sales_client import SalesServiceClient
from app.core.config import settings
sales_client = SalesServiceClient(settings)
try:
# Fetch sales data for this product
2025-12-14 19:05:37 +01:00
sales_data = await sales_client.get_sales_data(
2025-11-05 13:34:56 +01:00
tenant_id=tenant_id,
product_id=product_id,
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d')
)
2025-12-14 19:05:37 +01:00
if not sales_data:
2025-11-05 13:34:56 +01:00
logger.warning(
f"No sales history for product {product_id}, skipping"
)
continue
demand_data = []
for sale in sales_data:
demand_data.append({
'date': pd.to_datetime(sale.get('date') or sale.get('sale_date')),
'quantity': float(sale.get('quantity', 0))
})
if not demand_data:
logger.warning(
f"No valid demand data for product {product_id}, skipping"
)
continue
demand_history = pd.DataFrame(demand_data)
# Aggregate by date if there are multiple sales per day
demand_history = demand_history.groupby('date').agg({
'quantity': 'sum'
}).reset_index()
if len(demand_history) < request_data.min_history_days:
logger.warning(
f"Insufficient demand history for product {product_id}: "
f"{len(demand_history)} days < {request_data.min_history_days} required"
)
continue
except Exception as e:
logger.error(
f"Error fetching sales data for product {product_id}: {e}",
exc_info=True
)
continue
# Get lead time from supplier if available
lead_time_days = 7 # Default fallback
if product.supplier_id:
try:
from shared.clients.suppliers_client import SuppliersClient
suppliers_client = SuppliersClient()
supplier_data = await suppliers_client.get_supplier_by_id(
tenant_id=str(tenant_id),
supplier_id=str(product.supplier_id)
)
if supplier_data and 'standard_lead_time' in supplier_data:
lead_time_days = supplier_data['standard_lead_time']
logger.debug(
f"Using supplier lead time for product {product_id}",
lead_time=lead_time_days,
supplier_id=str(product.supplier_id)
)
except Exception as e:
logger.warning(
f"Failed to fetch supplier lead time for product {product_id}, using default",
error=str(e),
supplier_id=str(product.supplier_id)
)
2025-11-05 13:34:56 +01:00
# Product characteristics
product_characteristics = {
'lead_time_days': lead_time_days,
2025-11-05 13:34:56 +01:00
'shelf_life_days': 30 if product.is_perishable else 365,
'perishable': product.is_perishable
}
# Run optimization
results = await orchestrator.optimize_and_post_insights(
tenant_id=tenant_id,
inventory_product_id=product_id,
demand_history=demand_history,
product_characteristics=product_characteristics,
min_history_days=request_data.min_history_days
)
# Track results
total_insights_generated += results['insights_generated']
total_insights_posted += results['insights_posted']
if results.get('cost_savings'):
total_cost_savings += results['cost_savings']
insights_by_product[product_id] = {
'product_name': product.name,
'insights_posted': results['insights_posted'],
'optimal_safety_stock': results.get('optimal_safety_stock'),
'cost_savings': results.get('cost_savings', 0.0)
}
logger.info(
f"Product {product_id} optimization complete",
insights_posted=results['insights_posted'],
cost_savings=results.get('cost_savings', 0)
)
except Exception as e:
error_msg = f"Error optimizing product {product_id}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
# Close orchestrator
await orchestrator.close()
# Build response
response = SafetyStockOptimizationResponse(
success=total_insights_posted > 0,
message=f"Successfully optimized {len(products)} products, generated {total_insights_posted} insights",
tenant_id=tenant_id,
products_optimized=len(products),
total_insights_generated=total_insights_generated,
total_insights_posted=total_insights_posted,
total_cost_savings=round(total_cost_savings, 2),
insights_by_product=insights_by_product,
errors=errors
)
logger.info(
"ML insights safety stock optimization complete",
tenant_id=tenant_id,
total_insights=total_insights_posted,
total_savings=total_cost_savings
)
return response
except Exception as e:
logger.error(
"ML insights safety stock optimization failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Safety stock optimization failed: {str(e)}"
)
@router.get("/health")
async def ml_insights_health():
"""Health check for ML insights endpoints"""
return {
"status": "healthy",
"service": "inventory-ml-insights",
"endpoints": [
"POST /ml/insights/optimize-safety-stock"
]
}
2025-12-13 23:57:54 +01:00
# ================================================================
# INTERNAL ENDPOINTS (for demo-session service)
# ================================================================
from fastapi import Request
# Create a separate router for internal endpoints to avoid the tenant prefix
internal_router = APIRouter(
tags=["ML Insights - Internal"]
)
@internal_router.post("/api/v1/tenants/{tenant_id}/inventory/internal/ml/generate-safety-stock-insights")
async def generate_safety_stock_insights_internal(
tenant_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Internal endpoint to trigger safety stock insights generation for demo sessions.
This endpoint is called by the demo-session service after cloning data.
It uses the same ML logic as the public endpoint but with optimized defaults.
2026-01-12 22:15:11 +01:00
Security: Protected by x-internal-service header check.
2025-12-13 23:57:54 +01:00
Args:
tenant_id: The tenant UUID
request: FastAPI request object
db: Database session
Returns:
{
"insights_posted": int,
"tenant_id": str,
"status": str
}
"""
# Verify internal service header
2026-01-12 22:15:11 +01:00
if not request or request.headers.get("x-internal-service") not in ["demo-session", "internal"]:
2025-12-13 23:57:54 +01:00
logger.warning("Unauthorized internal API call", tenant_id=tenant_id)
raise HTTPException(
status_code=403,
detail="This endpoint is for internal service use only"
)
logger.info("Internal safety stock insights generation triggered", tenant_id=tenant_id)
try:
# Use the existing safety stock optimization logic with sensible defaults
request_data = SafetyStockOptimizationRequest(
product_ids=None, # Analyze all products
lookback_days=90, # 3 months of history
min_history_days=30 # Minimum 30 days required
)
# Call the existing safety stock optimization endpoint logic
result = await trigger_safety_stock_optimization(
tenant_id=tenant_id,
request_data=request_data,
2025-12-15 21:14:22 +01:00
request=request,
2025-12-13 23:57:54 +01:00
db=db
)
# Return simplified response for internal use
return {
"insights_posted": result.total_insights_posted,
"tenant_id": tenant_id,
"status": "success" if result.success else "failed",
"message": result.message,
"products_optimized": result.products_optimized,
"total_cost_savings": result.total_cost_savings
}
except Exception as e:
logger.error(
"Internal safety stock insights generation failed",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
raise HTTPException(
status_code=500,
detail=f"Internal safety stock insights generation failed: {str(e)}"
)