Files
bakery-ia/services/orchestrator/app/api/dashboard.py
2025-11-13 16:01:08 +01:00

555 lines
22 KiB
Python

# ================================================================
# services/orchestrator/app/api/dashboard.py
# ================================================================
"""
Dashboard API endpoints for JTBD-aligned bakery dashboard
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
import logging
from app.core.database import get_db
from app.core.config import settings
from ..services.dashboard_service import DashboardService
from shared.clients import (
get_inventory_client,
get_production_client,
get_alerts_client,
ProductionServiceClient,
InventoryServiceClient,
AlertsServiceClient
)
from shared.clients.procurement_client import ProcurementServiceClient
logger = logging.getLogger(__name__)
# Initialize service clients
inventory_client = get_inventory_client(settings, "orchestrator")
production_client = get_production_client(settings, "orchestrator")
procurement_client = ProcurementServiceClient(settings)
alerts_client = get_alerts_client(settings, "orchestrator")
router = APIRouter(prefix="/api/v1/tenants/{tenant_id}/dashboard", tags=["dashboard"])
# ============================================================
# Response Models
# ============================================================
class HeadlineData(BaseModel):
"""i18n-ready headline data"""
key: str = Field(..., description="i18n translation key")
params: Dict[str, Any] = Field(default_factory=dict, description="Parameters for translation")
class HealthChecklistItem(BaseModel):
"""Individual item in health checklist"""
icon: str = Field(..., description="Icon name: check, warning, alert")
text: Optional[str] = Field(None, description="Deprecated: Use textKey instead")
textKey: Optional[str] = Field(None, description="i18n translation key")
textParams: Optional[Dict[str, Any]] = Field(None, description="Parameters for i18n translation")
actionRequired: bool = Field(..., description="Whether action is required")
class BakeryHealthStatusResponse(BaseModel):
"""Overall bakery health status"""
status: str = Field(..., description="Health status: green, yellow, red")
headline: HeadlineData = Field(..., description="i18n-ready status headline")
lastOrchestrationRun: Optional[str] = Field(None, description="ISO timestamp of last orchestration")
nextScheduledRun: str = Field(..., description="ISO timestamp of next scheduled run")
checklistItems: List[HealthChecklistItem] = Field(..., description="Status checklist")
criticalIssues: int = Field(..., description="Count of critical issues")
pendingActions: int = Field(..., description="Count of pending actions")
class ReasoningInputs(BaseModel):
"""Inputs used by orchestrator for decision making"""
customerOrders: int = Field(..., description="Number of customer orders analyzed")
historicalDemand: bool = Field(..., description="Whether historical data was used")
inventoryLevels: bool = Field(..., description="Whether inventory levels were considered")
aiInsights: bool = Field(..., description="Whether AI insights were used")
class PurchaseOrderSummary(BaseModel):
"""Summary of a purchase order for dashboard"""
supplierName: str
itemCategories: List[str]
totalAmount: float
class ProductionBatchSummary(BaseModel):
"""Summary of a production batch for dashboard"""
productName: str
quantity: float
readyByTime: str
class OrchestrationSummaryResponse(BaseModel):
"""What the orchestrator did for the user"""
runTimestamp: Optional[str] = Field(None, description="When the orchestration ran")
runNumber: Optional[str] = Field(None, description="Run number identifier")
status: str = Field(..., description="Run status")
purchaseOrdersCreated: int = Field(..., description="Number of POs created")
purchaseOrdersSummary: List[PurchaseOrderSummary] = Field(default_factory=list)
productionBatchesCreated: int = Field(..., description="Number of batches created")
productionBatchesSummary: List[ProductionBatchSummary] = Field(default_factory=list)
reasoningInputs: ReasoningInputs
userActionsRequired: int = Field(..., description="Number of actions needing approval")
durationSeconds: Optional[int] = Field(None, description="How long orchestration took")
aiAssisted: bool = Field(False, description="Whether AI insights were used")
message: Optional[str] = Field(None, description="User-friendly message")
class ActionButton(BaseModel):
"""Action button configuration"""
label: str
type: str = Field(..., description="Button type: primary, secondary, tertiary")
action: str = Field(..., description="Action identifier")
class ActionItem(BaseModel):
"""Individual action requiring user attention"""
id: str
type: str = Field(..., description="Action type")
urgency: str = Field(..., description="Urgency: critical, important, normal")
title: str
subtitle: str
reasoning: str = Field(..., description="Why this action is needed")
consequence: str = Field(..., description="What happens if not done")
amount: Optional[float] = Field(None, description="Amount for financial actions")
currency: Optional[str] = Field(None, description="Currency code")
actions: List[ActionButton]
estimatedTimeMinutes: int
class ActionQueueResponse(BaseModel):
"""Prioritized queue of actions"""
actions: List[ActionItem]
totalActions: int
criticalCount: int
importantCount: int
class ProductionTimelineItem(BaseModel):
"""Individual production batch in timeline"""
id: str
batchNumber: str
productName: str
quantity: float
unit: str
plannedStartTime: Optional[str]
plannedEndTime: Optional[str]
actualStartTime: Optional[str]
status: str
statusIcon: str
statusText: str
progress: int = Field(..., ge=0, le=100, description="Progress percentage")
readyBy: Optional[str]
priority: str
reasoning: str
class ProductionTimelineResponse(BaseModel):
"""Today's production timeline"""
timeline: List[ProductionTimelineItem]
totalBatches: int
completedBatches: int
inProgressBatches: int
pendingBatches: int
class InsightCard(BaseModel):
"""Individual insight card"""
label: str
value: str
detail: str
color: str = Field(..., description="Color: green, amber, red")
class InsightsResponse(BaseModel):
"""Key insights grid"""
savings: InsightCard
inventory: InsightCard
waste: InsightCard
deliveries: InsightCard
# ============================================================
# API Endpoints
# ============================================================
@router.get("/health-status", response_model=BakeryHealthStatusResponse)
async def get_bakery_health_status(
tenant_id: str,
db: AsyncSession = Depends(get_db)
) -> BakeryHealthStatusResponse:
"""
Get overall bakery health status
This is the top-level indicator showing if the bakery is running smoothly
or if there are issues requiring attention.
"""
try:
dashboard_service = DashboardService(db)
# Gather metrics from various services
# In a real implementation, these would be fetched from respective services
# For now, we'll make HTTP calls to the services
# Get alerts summary
try:
alerts_data = await alerts_client.get_alerts_summary(tenant_id) or {}
critical_alerts = alerts_data.get("critical_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
critical_alerts = 0
# Get pending PO count
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100) or []
pending_approvals = len(po_data) if isinstance(po_data, list) else 0
except Exception as e:
logger.warning(f"Failed to fetch POs: {e}")
pending_approvals = 0
# Get production delays
try:
prod_data = await production_client.get_production_batches_by_status(
tenant_id, status="ON_HOLD", limit=100
) or {}
production_delays = len(prod_data.get("batches", []))
except Exception as e:
logger.warning(f"Failed to fetch production batches: {e}")
production_delays = 0
# Get inventory status
try:
inv_data = await inventory_client.get_inventory_dashboard(tenant_id) or {}
out_of_stock_count = inv_data.get("out_of_stock_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch inventory: {e}")
out_of_stock_count = 0
# System errors (would come from monitoring system)
system_errors = 0
# Calculate health status
health_status = await dashboard_service.get_bakery_health_status(
tenant_id=tenant_id,
critical_alerts=critical_alerts,
pending_approvals=pending_approvals,
production_delays=production_delays,
out_of_stock_count=out_of_stock_count,
system_errors=system_errors
)
return BakeryHealthStatusResponse(**health_status)
except Exception as e:
logger.error(f"Error getting health status: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orchestration-summary", response_model=OrchestrationSummaryResponse)
async def get_orchestration_summary(
tenant_id: str,
run_id: Optional[str] = Query(None, description="Specific run ID, or latest if not provided"),
db: AsyncSession = Depends(get_db)
) -> OrchestrationSummaryResponse:
"""
Get narrative summary of what the orchestrator did
This provides transparency into the automation, showing what was planned
and why, helping build user trust in the system.
"""
try:
dashboard_service = DashboardService(db)
# Get orchestration summary
summary = await dashboard_service.get_orchestration_summary(
tenant_id=tenant_id,
last_run_id=run_id
)
# Enhance with detailed PO and batch summaries
if summary["purchaseOrdersCreated"] > 0:
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=10)
if po_data and isinstance(po_data, list):
summary["purchaseOrdersSummary"] = [
PurchaseOrderSummary(
supplierName=po.get("supplier_name", "Unknown"),
itemCategories=[item.get("ingredient_name", "Item") for item in po.get("items", [])[:3]],
totalAmount=float(po.get("total_amount", 0))
)
for po in po_data[:5] # Show top 5
]
except Exception as e:
logger.warning(f"Failed to fetch PO details: {e}")
if summary["productionBatchesCreated"] > 0:
try:
batch_data = await production_client.get_todays_batches(tenant_id)
if batch_data:
batches = batch_data.get("batches", [])
summary["productionBatchesSummary"] = [
ProductionBatchSummary(
productName=batch.get("product_name", "Unknown"),
quantity=batch.get("planned_quantity", 0),
readyByTime=batch.get("planned_end_time", "")
)
for batch in batches[:5] # Show top 5
]
except Exception as e:
logger.warning(f"Failed to fetch batch details: {e}")
return OrchestrationSummaryResponse(**summary)
except Exception as e:
logger.error(f"Error getting orchestration summary: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/action-queue", response_model=ActionQueueResponse)
async def get_action_queue(
tenant_id: str,
db: AsyncSession = Depends(get_db)
) -> ActionQueueResponse:
"""
Get prioritized queue of actions requiring user attention
This is the core of the JTBD dashboard - showing exactly what the user
needs to do right now, prioritized by urgency and impact.
"""
try:
dashboard_service = DashboardService(db)
# Fetch data from various services
# Get pending POs
pending_pos = []
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=20)
if po_data and isinstance(po_data, list):
pending_pos = po_data
except Exception as e:
logger.warning(f"Failed to fetch pending POs: {e}")
# Get critical alerts
critical_alerts = []
try:
alerts_data = await alerts_client.get_critical_alerts(tenant_id, limit=20)
if alerts_data:
critical_alerts = alerts_data.get("alerts", [])
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
# Get onboarding status
onboarding_incomplete = False
onboarding_steps = []
try:
onboarding_data = await procurement_client.get(
"/procurement/auth/onboarding-progress",
tenant_id=tenant_id
)
if onboarding_data:
onboarding_incomplete = not onboarding_data.get("completed", True)
onboarding_steps = onboarding_data.get("steps", [])
except Exception as e:
logger.warning(f"Failed to fetch onboarding status: {e}")
# Build action queue
actions = await dashboard_service.get_action_queue(
tenant_id=tenant_id,
pending_pos=pending_pos,
critical_alerts=critical_alerts,
onboarding_incomplete=onboarding_incomplete,
onboarding_steps=onboarding_steps
)
# Count by urgency
critical_count = sum(1 for a in actions if a["urgency"] == "critical")
important_count = sum(1 for a in actions if a["urgency"] == "important")
return ActionQueueResponse(
actions=[ActionItem(**action) for action in actions[:10]], # Show top 10
totalActions=len(actions),
criticalCount=critical_count,
importantCount=important_count
)
except Exception as e:
logger.error(f"Error getting action queue: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/production-timeline", response_model=ProductionTimelineResponse)
async def get_production_timeline(
tenant_id: str,
db: AsyncSession = Depends(get_db)
) -> ProductionTimelineResponse:
"""
Get today's production timeline
Shows what's being made today in chronological order with status and progress.
"""
try:
dashboard_service = DashboardService(db)
# Fetch today's production batches
batches = []
try:
batch_data = await production_client.get_todays_batches(tenant_id)
if batch_data:
batches = batch_data.get("batches", [])
except Exception as e:
logger.warning(f"Failed to fetch production batches: {e}")
# Transform to timeline format
timeline = await dashboard_service.get_production_timeline(
tenant_id=tenant_id,
batches=batches
)
# Count by status
completed = sum(1 for item in timeline if item["status"] == "COMPLETED")
in_progress = sum(1 for item in timeline if item["status"] == "IN_PROGRESS")
pending = sum(1 for item in timeline if item["status"] == "PENDING")
return ProductionTimelineResponse(
timeline=[ProductionTimelineItem(**item) for item in timeline],
totalBatches=len(timeline),
completedBatches=completed,
inProgressBatches=in_progress,
pendingBatches=pending
)
except Exception as e:
logger.error(f"Error getting production timeline: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/insights", response_model=InsightsResponse)
async def get_insights(
tenant_id: str,
db: AsyncSession = Depends(get_db)
) -> InsightsResponse:
"""
Get key insights for dashboard grid
Provides glanceable metrics on savings, inventory, waste, and deliveries.
"""
try:
dashboard_service = DashboardService(db)
# Fetch data from various services
# Sustainability data
sustainability_data = {}
try:
sustainability_data = await inventory_client.get_sustainability_widget(tenant_id) or {}
except Exception as e:
logger.warning(f"Failed to fetch sustainability data: {e}")
# Inventory data
inventory_data = {}
try:
raw_inventory_data = await inventory_client.get_stock_status(tenant_id)
# Handle case where API returns a list instead of dict
if isinstance(raw_inventory_data, dict):
inventory_data = raw_inventory_data
elif isinstance(raw_inventory_data, list):
# If it's a list, aggregate the data
inventory_data = {
"low_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "low_stock"),
"out_of_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "out_of_stock"),
"total_items": len(raw_inventory_data)
}
else:
inventory_data = {}
except Exception as e:
logger.warning(f"Failed to fetch inventory data: {e}")
# Deliveries data from procurement
delivery_data = {}
try:
# Get recent POs with pending deliveries
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100)
if pos_result and isinstance(pos_result, list):
# Count deliveries expected today
from datetime import datetime, timezone
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
today_end = today_start.replace(hour=23, minute=59, second=59)
deliveries_today = 0
for po in pos_result:
expected_date = po.get("expected_delivery_date")
if expected_date:
if isinstance(expected_date, str):
expected_date = datetime.fromisoformat(expected_date.replace('Z', '+00:00'))
if today_start <= expected_date <= today_end:
deliveries_today += 1
delivery_data = {"deliveries_today": deliveries_today}
except Exception as e:
logger.warning(f"Failed to fetch delivery data: {e}")
# Savings data - Calculate from recent PO price optimizations
savings_data = {}
try:
# Get recent POs (last 7 days) and sum up optimization savings
from datetime import datetime, timedelta, timezone
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=200)
if pos_result and isinstance(pos_result, list):
weekly_savings = 0
# Calculate savings from price optimization
for po in pos_result:
# Check if PO was created in last 7 days
created_at = po.get("created_at")
if created_at:
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_at >= seven_days_ago:
# Sum up savings from optimization
optimization_data = po.get("optimization_data", {})
if isinstance(optimization_data, dict):
savings = optimization_data.get("savings", 0) or 0
weekly_savings += float(savings)
# Default trend percentage (would need historical data for real trend)
savings_data = {
"weekly_savings": round(weekly_savings, 2),
"trend_percentage": 12 if weekly_savings > 0 else 0
}
else:
savings_data = {"weekly_savings": 0, "trend_percentage": 0}
except Exception as e:
logger.warning(f"Failed to calculate savings data: {e}")
savings_data = {"weekly_savings": 0, "trend_percentage": 0}
# Merge delivery data into inventory data
inventory_data.update(delivery_data)
# Calculate insights
insights = await dashboard_service.calculate_insights(
tenant_id=tenant_id,
sustainability_data=sustainability_data,
inventory_data=inventory_data,
savings_data=savings_data
)
return InsightsResponse(
savings=InsightCard(**insights["savings"]),
inventory=InsightCard(**insights["inventory"]),
waste=InsightCard(**insights["waste"]),
deliveries=InsightCard(**insights["deliveries"])
)
except Exception as e:
logger.error(f"Error getting insights: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))