533 lines
19 KiB
Python
533 lines
19 KiB
Python
"""
|
|
ML Insights API Endpoints for Procurement Service
|
|
|
|
Provides endpoints to trigger ML insight generation for:
|
|
- Supplier performance analysis
|
|
- Price forecasting and timing recommendations
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List
|
|
from uuid import UUID
|
|
from datetime import datetime, timedelta
|
|
import structlog
|
|
import pandas as pd
|
|
|
|
from app.core.database import get_db
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
router = APIRouter(
|
|
prefix="/api/v1/tenants/{tenant_id}/procurement/ml/insights",
|
|
tags=["ML Insights"]
|
|
)
|
|
|
|
|
|
# ================================================================
|
|
# REQUEST/RESPONSE SCHEMAS - SUPPLIER ANALYSIS
|
|
# ================================================================
|
|
|
|
class SupplierAnalysisRequest(BaseModel):
|
|
"""Request schema for supplier performance analysis"""
|
|
supplier_ids: Optional[List[str]] = Field(
|
|
None,
|
|
description="Specific supplier IDs to analyze. If None, analyzes all suppliers"
|
|
)
|
|
lookback_days: int = Field(
|
|
180,
|
|
description="Days of historical orders to analyze",
|
|
ge=30,
|
|
le=730
|
|
)
|
|
min_orders: int = Field(
|
|
10,
|
|
description="Minimum orders required for analysis",
|
|
ge=5,
|
|
le=100
|
|
)
|
|
|
|
|
|
class SupplierAnalysisResponse(BaseModel):
|
|
"""Response schema for supplier performance analysis"""
|
|
success: bool
|
|
message: str
|
|
tenant_id: str
|
|
suppliers_analyzed: int
|
|
total_insights_generated: int
|
|
total_insights_posted: int
|
|
high_risk_suppliers: int
|
|
insights_by_supplier: dict
|
|
errors: List[str] = []
|
|
|
|
|
|
# ================================================================
|
|
# REQUEST/RESPONSE SCHEMAS - PRICE FORECASTING
|
|
# ================================================================
|
|
|
|
class PriceForecastRequest(BaseModel):
|
|
"""Request schema for price forecasting"""
|
|
ingredient_ids: Optional[List[str]] = Field(
|
|
None,
|
|
description="Specific ingredient IDs to forecast. If None, forecasts all ingredients"
|
|
)
|
|
lookback_days: int = Field(
|
|
180,
|
|
description="Days of historical price data to analyze",
|
|
ge=90,
|
|
le=730
|
|
)
|
|
forecast_horizon_days: int = Field(
|
|
30,
|
|
description="Days to forecast ahead",
|
|
ge=7,
|
|
le=90
|
|
)
|
|
|
|
|
|
class PriceForecastResponse(BaseModel):
|
|
"""Response schema for price forecasting"""
|
|
success: bool
|
|
message: str
|
|
tenant_id: str
|
|
ingredients_forecasted: int
|
|
total_insights_generated: int
|
|
total_insights_posted: int
|
|
buy_now_recommendations: int
|
|
bulk_opportunities: int
|
|
insights_by_ingredient: dict
|
|
errors: List[str] = []
|
|
|
|
|
|
# ================================================================
|
|
# API ENDPOINTS - SUPPLIER ANALYSIS
|
|
# ================================================================
|
|
|
|
@router.post("/analyze-suppliers", response_model=SupplierAnalysisResponse)
|
|
async def trigger_supplier_analysis(
|
|
tenant_id: str,
|
|
request_data: SupplierAnalysisRequest,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Trigger supplier performance analysis.
|
|
|
|
This endpoint:
|
|
1. Fetches historical purchase order data for specified suppliers
|
|
2. Runs the SupplierInsightsOrchestrator to analyze reliability
|
|
3. Generates insights about supplier performance and risk
|
|
4. Posts insights to AI Insights Service
|
|
|
|
Args:
|
|
tenant_id: Tenant UUID
|
|
request_data: Analysis parameters
|
|
db: Database session
|
|
|
|
Returns:
|
|
SupplierAnalysisResponse with analysis results
|
|
"""
|
|
logger.info(
|
|
"ML insights supplier analysis requested",
|
|
tenant_id=tenant_id,
|
|
supplier_ids=request_data.supplier_ids,
|
|
lookback_days=request_data.lookback_days
|
|
)
|
|
|
|
try:
|
|
# Import ML orchestrator and clients
|
|
from app.ml.supplier_insights_orchestrator import SupplierInsightsOrchestrator
|
|
from app.models.purchase_order import PurchaseOrder
|
|
from shared.clients.suppliers_client import SuppliersServiceClient
|
|
from app.core.config import settings
|
|
from sqlalchemy import select
|
|
|
|
# Initialize orchestrator and clients
|
|
orchestrator = SupplierInsightsOrchestrator()
|
|
suppliers_client = SuppliersServiceClient(settings)
|
|
|
|
# Get suppliers to analyze from suppliers service via API
|
|
if request_data.supplier_ids:
|
|
# Fetch specific suppliers
|
|
suppliers = []
|
|
for supplier_id in request_data.supplier_ids:
|
|
supplier = await suppliers_client.get_supplier_by_id(
|
|
tenant_id=tenant_id,
|
|
supplier_id=supplier_id
|
|
)
|
|
if supplier:
|
|
suppliers.append(supplier)
|
|
else:
|
|
# Fetch all active suppliers (limit to 10)
|
|
all_suppliers = await suppliers_client.get_all_suppliers(
|
|
tenant_id=tenant_id,
|
|
is_active=True
|
|
)
|
|
suppliers = (all_suppliers or [])[:10] # Limit to prevent timeout
|
|
|
|
if not suppliers:
|
|
return SupplierAnalysisResponse(
|
|
success=False,
|
|
message="No suppliers found for analysis",
|
|
tenant_id=tenant_id,
|
|
suppliers_analyzed=0,
|
|
total_insights_generated=0,
|
|
total_insights_posted=0,
|
|
high_risk_suppliers=0,
|
|
insights_by_supplier={},
|
|
errors=["No suppliers found"]
|
|
)
|
|
|
|
# Calculate date range for order history
|
|
end_date = datetime.utcnow()
|
|
start_date = end_date - timedelta(days=request_data.lookback_days)
|
|
|
|
# Process each supplier
|
|
total_insights_generated = 0
|
|
total_insights_posted = 0
|
|
high_risk_suppliers = 0
|
|
insights_by_supplier = {}
|
|
errors = []
|
|
|
|
for supplier in suppliers:
|
|
try:
|
|
supplier_id = str(supplier['id'])
|
|
supplier_name = supplier.get('name', 'Unknown')
|
|
logger.info(f"Analyzing supplier {supplier_name} ({supplier_id})")
|
|
|
|
# Get purchase orders for this supplier from local database
|
|
po_query = select(PurchaseOrder).where(
|
|
PurchaseOrder.tenant_id == UUID(tenant_id),
|
|
PurchaseOrder.supplier_id == UUID(supplier_id),
|
|
PurchaseOrder.order_date >= start_date,
|
|
PurchaseOrder.order_date <= end_date
|
|
)
|
|
|
|
po_result = await db.execute(po_query)
|
|
purchase_orders = po_result.scalars().all()
|
|
|
|
if len(purchase_orders) < request_data.min_orders:
|
|
logger.warning(
|
|
f"Insufficient orders for supplier {supplier_id}: "
|
|
f"{len(purchase_orders)} < {request_data.min_orders} required"
|
|
)
|
|
continue
|
|
|
|
# Create order history DataFrame
|
|
order_data = []
|
|
for po in purchase_orders:
|
|
# Calculate delivery performance
|
|
if po.delivery_date and po.expected_delivery_date:
|
|
days_late = (po.delivery_date - po.expected_delivery_date).days
|
|
on_time = days_late <= 0
|
|
else:
|
|
days_late = 0
|
|
on_time = True
|
|
|
|
# Calculate quality score (based on status)
|
|
quality_score = 100 if po.status == 'completed' else 80
|
|
|
|
order_data.append({
|
|
'order_date': po.order_date,
|
|
'expected_delivery_date': po.expected_delivery_date,
|
|
'delivery_date': po.delivery_date,
|
|
'days_late': days_late,
|
|
'on_time': on_time,
|
|
'quality_score': quality_score,
|
|
'total_amount': float(po.total_amount) if po.total_amount else 0
|
|
})
|
|
|
|
order_history = pd.DataFrame(order_data)
|
|
|
|
# Run supplier analysis
|
|
results = await orchestrator.analyze_and_post_supplier_insights(
|
|
tenant_id=tenant_id,
|
|
supplier_id=supplier_id,
|
|
order_history=order_history,
|
|
min_orders=request_data.min_orders
|
|
)
|
|
|
|
# Track results
|
|
total_insights_generated += results['insights_generated']
|
|
total_insights_posted += results['insights_posted']
|
|
|
|
reliability_score = results.get('reliability_score', 100)
|
|
if reliability_score < 70:
|
|
high_risk_suppliers += 1
|
|
|
|
insights_by_supplier[supplier_id] = {
|
|
'supplier_name': supplier_name,
|
|
'insights_posted': results['insights_posted'],
|
|
'reliability_score': reliability_score,
|
|
'orders_analyzed': results['orders_analyzed']
|
|
}
|
|
|
|
logger.info(
|
|
f"Supplier {supplier_id} analysis complete",
|
|
insights_posted=results['insights_posted'],
|
|
reliability_score=reliability_score
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error analyzing supplier {supplier_id}: {str(e)}"
|
|
logger.error(error_msg, exc_info=True)
|
|
errors.append(error_msg)
|
|
|
|
# Close orchestrator
|
|
await orchestrator.close()
|
|
|
|
# Build response
|
|
response = SupplierAnalysisResponse(
|
|
success=total_insights_posted > 0,
|
|
message=f"Successfully analyzed {len(insights_by_supplier)} suppliers, generated {total_insights_posted} insights",
|
|
tenant_id=tenant_id,
|
|
suppliers_analyzed=len(insights_by_supplier),
|
|
total_insights_generated=total_insights_generated,
|
|
total_insights_posted=total_insights_posted,
|
|
high_risk_suppliers=high_risk_suppliers,
|
|
insights_by_supplier=insights_by_supplier,
|
|
errors=errors
|
|
)
|
|
|
|
logger.info(
|
|
"ML insights supplier analysis complete",
|
|
tenant_id=tenant_id,
|
|
total_insights=total_insights_posted,
|
|
high_risk_suppliers=high_risk_suppliers
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"ML insights supplier analysis failed",
|
|
tenant_id=tenant_id,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Supplier analysis failed: {str(e)}"
|
|
)
|
|
|
|
|
|
# ================================================================
|
|
# API ENDPOINTS - PRICE FORECASTING
|
|
# ================================================================
|
|
|
|
@router.post("/forecast-prices", response_model=PriceForecastResponse)
|
|
async def trigger_price_forecasting(
|
|
tenant_id: str,
|
|
request_data: PriceForecastRequest,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Trigger price forecasting for procurement ingredients.
|
|
|
|
This endpoint:
|
|
1. Fetches historical price data for specified ingredients
|
|
2. Runs the PriceInsightsOrchestrator to forecast future prices
|
|
3. Generates insights about optimal purchase timing
|
|
4. Posts insights to AI Insights Service
|
|
|
|
Args:
|
|
tenant_id: Tenant UUID
|
|
request_data: Forecasting parameters
|
|
db: Database session
|
|
|
|
Returns:
|
|
PriceForecastResponse with forecasting results
|
|
"""
|
|
logger.info(
|
|
"ML insights price forecasting requested",
|
|
tenant_id=tenant_id,
|
|
ingredient_ids=request_data.ingredient_ids,
|
|
lookback_days=request_data.lookback_days
|
|
)
|
|
|
|
try:
|
|
# Import ML orchestrator and clients
|
|
from app.ml.price_insights_orchestrator import PriceInsightsOrchestrator
|
|
from shared.clients.inventory_client import InventoryServiceClient
|
|
from app.models.purchase_order import PurchaseOrderItem
|
|
from app.core.config import settings
|
|
from sqlalchemy import select
|
|
|
|
# Initialize orchestrator and inventory client
|
|
orchestrator = PriceInsightsOrchestrator()
|
|
inventory_client = InventoryServiceClient(settings)
|
|
|
|
# Get ingredients to forecast from inventory service via API
|
|
if request_data.ingredient_ids:
|
|
# Fetch specific ingredients
|
|
ingredients = []
|
|
for ingredient_id in request_data.ingredient_ids:
|
|
ingredient = await inventory_client.get_ingredient_by_id(
|
|
ingredient_id=ingredient_id,
|
|
tenant_id=tenant_id
|
|
)
|
|
if ingredient:
|
|
ingredients.append(ingredient)
|
|
else:
|
|
# Fetch all ingredients for tenant (limit to 10)
|
|
all_ingredients = await inventory_client.get_all_ingredients(tenant_id=tenant_id)
|
|
ingredients = all_ingredients[:10] if all_ingredients else [] # Limit to prevent timeout
|
|
|
|
if not ingredients:
|
|
return PriceForecastResponse(
|
|
success=False,
|
|
message="No ingredients found for forecasting",
|
|
tenant_id=tenant_id,
|
|
ingredients_forecasted=0,
|
|
total_insights_generated=0,
|
|
total_insights_posted=0,
|
|
buy_now_recommendations=0,
|
|
bulk_opportunities=0,
|
|
insights_by_ingredient={},
|
|
errors=["No ingredients found"]
|
|
)
|
|
|
|
# Calculate date range for price history
|
|
end_date = datetime.utcnow()
|
|
start_date = end_date - timedelta(days=request_data.lookback_days)
|
|
|
|
# Process each ingredient
|
|
total_insights_generated = 0
|
|
total_insights_posted = 0
|
|
buy_now_recommendations = 0
|
|
bulk_opportunities = 0
|
|
insights_by_ingredient = {}
|
|
errors = []
|
|
|
|
for ingredient in ingredients:
|
|
try:
|
|
ingredient_id = str(ingredient['id'])
|
|
ingredient_name = ingredient.get('name', 'Unknown Ingredient')
|
|
logger.info(f"Forecasting prices for {ingredient_name} ({ingredient_id})")
|
|
|
|
# Get price history from purchase order items
|
|
poi_query = select(PurchaseOrderItem).where(
|
|
PurchaseOrderItem.ingredient_id == UUID(ingredient_id)
|
|
).join(
|
|
PurchaseOrderItem.purchase_order
|
|
).where(
|
|
PurchaseOrderItem.purchase_order.has(
|
|
tenant_id=UUID(tenant_id)
|
|
)
|
|
)
|
|
|
|
poi_result = await db.execute(poi_query)
|
|
purchase_items = poi_result.scalars().all()
|
|
|
|
if len(purchase_items) < 30:
|
|
logger.warning(
|
|
f"Insufficient price history for ingredient {ingredient_id}: "
|
|
f"{len(purchase_items)} items"
|
|
)
|
|
continue
|
|
|
|
# Create price history DataFrame
|
|
price_data = []
|
|
for item in purchase_items:
|
|
if item.unit_price and item.quantity:
|
|
price_data.append({
|
|
'date': item.purchase_order.order_date,
|
|
'price': float(item.unit_price),
|
|
'quantity': float(item.quantity),
|
|
'supplier_id': str(item.purchase_order.supplier_id)
|
|
})
|
|
|
|
price_history = pd.DataFrame(price_data)
|
|
price_history = price_history.sort_values('date')
|
|
|
|
# Run price forecasting
|
|
results = await orchestrator.forecast_and_post_insights(
|
|
tenant_id=tenant_id,
|
|
ingredient_id=ingredient_id,
|
|
price_history=price_history,
|
|
forecast_horizon_days=request_data.forecast_horizon_days,
|
|
min_history_days=request_data.lookback_days
|
|
)
|
|
|
|
# Track results
|
|
total_insights_generated += results['insights_generated']
|
|
total_insights_posted += results['insights_posted']
|
|
|
|
recommendation = results.get('recommendation', {})
|
|
if recommendation.get('action') == 'buy_now':
|
|
buy_now_recommendations += 1
|
|
|
|
bulk_opp = results.get('bulk_opportunity', {})
|
|
if bulk_opp.get('has_bulk_opportunity'):
|
|
bulk_opportunities += 1
|
|
|
|
insights_by_ingredient[ingredient_id] = {
|
|
'ingredient_name': ingredient_name,
|
|
'insights_posted': results['insights_posted'],
|
|
'recommendation': recommendation.get('action'),
|
|
'has_bulk_opportunity': bulk_opp.get('has_bulk_opportunity', False)
|
|
}
|
|
|
|
logger.info(
|
|
f"Ingredient {ingredient_id} forecasting complete",
|
|
insights_posted=results['insights_posted'],
|
|
recommendation=recommendation.get('action')
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error forecasting ingredient {ingredient_id}: {str(e)}"
|
|
logger.error(error_msg, exc_info=True)
|
|
errors.append(error_msg)
|
|
|
|
# Close orchestrator
|
|
await orchestrator.close()
|
|
|
|
# Build response
|
|
response = PriceForecastResponse(
|
|
success=total_insights_posted > 0,
|
|
message=f"Successfully forecasted {len(insights_by_ingredient)} ingredients, generated {total_insights_posted} insights",
|
|
tenant_id=tenant_id,
|
|
ingredients_forecasted=len(insights_by_ingredient),
|
|
total_insights_generated=total_insights_generated,
|
|
total_insights_posted=total_insights_posted,
|
|
buy_now_recommendations=buy_now_recommendations,
|
|
bulk_opportunities=bulk_opportunities,
|
|
insights_by_ingredient=insights_by_ingredient,
|
|
errors=errors
|
|
)
|
|
|
|
logger.info(
|
|
"ML insights price forecasting complete",
|
|
tenant_id=tenant_id,
|
|
total_insights=total_insights_posted,
|
|
buy_now_recommendations=buy_now_recommendations,
|
|
bulk_opportunities=bulk_opportunities
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"ML insights price forecasting failed",
|
|
tenant_id=tenant_id,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Price forecasting failed: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/health")
|
|
async def ml_insights_health():
|
|
"""Health check for ML insights endpoints"""
|
|
return {
|
|
"status": "healthy",
|
|
"service": "procurement-ml-insights",
|
|
"endpoints": [
|
|
"POST /ml/insights/analyze-suppliers",
|
|
"POST /ml/insights/forecast-prices"
|
|
]
|
|
}
|