Files
bakery-ia/services/orchestrator/app/services/dashboard_service.py
Claude f2cb2448b7 fix: Add missing reasoning and consequence fields to PO approval actions
Error: 500 Internal Server Error on /dashboard/action-queue
Pydantic validation error: ActionItem requires 'reasoning' and 'consequence' fields

Root Cause:
-----------
Purchase order approval actions were missing required fields:
- Had: reasoning_data (dict) - not a valid field
- Needed: reasoning (string) and consequence (string)

The Fix:
--------
services/orchestrator/app/services/dashboard_service.py line 380-396

Changed from:
  'reasoning_data': {...}  # Invalid field

To:
  'reasoning': 'Pending approval for {supplier} - {type}'
  'consequence': 'Delayed delivery may impact production schedule'

Now action items have all required fields for Pydantic validation to pass.

Fixes the 500 error on action-queue endpoint.
2025-11-08 07:07:26 +00:00

611 lines
23 KiB
Python

# ================================================================
# services/orchestrator/app/services/dashboard_service.py
# ================================================================
"""
Bakery Dashboard Service - JTBD-Aligned Dashboard Data Aggregation
Provides health status, action queue, and orchestration summaries
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional, Tuple
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc
import logging
from ..models.orchestration_run import OrchestrationRun, OrchestrationStatus
logger = logging.getLogger(__name__)
class HealthStatus:
"""Bakery health status enumeration"""
GREEN = "green" # All good, no actions needed
YELLOW = "yellow" # Needs attention, 1-3 actions
RED = "red" # Critical issue, immediate intervention
class ActionType:
"""Types of actions that require user attention"""
APPROVE_PO = "approve_po"
RESOLVE_ALERT = "resolve_alert"
ADJUST_PRODUCTION = "adjust_production"
COMPLETE_ONBOARDING = "complete_onboarding"
REVIEW_OUTDATED_DATA = "review_outdated_data"
class ActionUrgency:
"""Action urgency levels"""
CRITICAL = "critical" # Must do now
IMPORTANT = "important" # Should do today
NORMAL = "normal" # Can do when convenient
class DashboardService:
"""
Aggregates data from multiple services to provide JTBD-aligned dashboard
"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_bakery_health_status(
self,
tenant_id: str,
critical_alerts: int,
pending_approvals: int,
production_delays: int,
out_of_stock_count: int,
system_errors: int
) -> Dict[str, Any]:
"""
Calculate overall bakery health status based on multiple signals
Args:
tenant_id: Tenant identifier
critical_alerts: Number of critical alerts
pending_approvals: Number of pending PO approvals
production_delays: Number of delayed production batches
out_of_stock_count: Number of out-of-stock ingredients
system_errors: Number of system errors
Returns:
Health status with headline and checklist
"""
# Determine overall status
status = self._calculate_health_status(
critical_alerts=critical_alerts,
pending_approvals=pending_approvals,
production_delays=production_delays,
out_of_stock_count=out_of_stock_count,
system_errors=system_errors
)
# Get last orchestration run
last_run = await self._get_last_orchestration_run(tenant_id)
# Generate checklist items
checklist_items = []
# Production status
if production_delays == 0:
checklist_items.append({
"icon": "check",
"text": "Production on schedule",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "warning",
"text": f"{production_delays} production batch{'es' if production_delays != 1 else ''} delayed",
"actionRequired": True
})
# Inventory status
if out_of_stock_count == 0:
checklist_items.append({
"icon": "check",
"text": "All ingredients in stock",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "alert",
"text": f"{out_of_stock_count} ingredient{'s' if out_of_stock_count != 1 else ''} out of stock",
"actionRequired": True
})
# Approval status
if pending_approvals == 0:
checklist_items.append({
"icon": "check",
"text": "No pending approvals",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "warning",
"text": f"{pending_approvals} purchase order{'s' if pending_approvals != 1 else ''} awaiting approval",
"actionRequired": True
})
# System health
if system_errors == 0 and critical_alerts == 0:
checklist_items.append({
"icon": "check",
"text": "All systems operational",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "alert",
"text": f"{critical_alerts + system_errors} critical issue{'s' if (critical_alerts + system_errors) != 1 else ''}",
"actionRequired": True
})
# Generate headline
headline = self._generate_health_headline(status, critical_alerts, pending_approvals)
# Calculate next scheduled run (5:30 AM next day)
now = datetime.now(timezone.utc)
next_run = now.replace(hour=5, minute=30, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
return {
"status": status,
"headline": headline,
"lastOrchestrationRun": last_run["timestamp"] if last_run else None,
"nextScheduledRun": next_run.isoformat(),
"checklistItems": checklist_items,
"criticalIssues": critical_alerts + system_errors,
"pendingActions": pending_approvals + production_delays + out_of_stock_count
}
def _calculate_health_status(
self,
critical_alerts: int,
pending_approvals: int,
production_delays: int,
out_of_stock_count: int,
system_errors: int
) -> str:
"""Calculate overall health status"""
# RED: Critical issues that need immediate attention
if (critical_alerts >= 3 or
out_of_stock_count > 0 or
system_errors > 0 or
production_delays > 2):
return HealthStatus.RED
# YELLOW: Some issues but not urgent
if (critical_alerts > 0 or
pending_approvals > 0 or
production_delays > 0):
return HealthStatus.YELLOW
# GREEN: All good
return HealthStatus.GREEN
def _generate_health_headline(
self,
status: str,
critical_alerts: int,
pending_approvals: int
) -> str:
"""Generate human-readable headline based on status"""
if status == HealthStatus.GREEN:
return "Your bakery is running smoothly"
elif status == HealthStatus.YELLOW:
if pending_approvals > 0:
return f"Please review {pending_approvals} pending approval{'s' if pending_approvals != 1 else ''}"
elif critical_alerts > 0:
return f"You have {critical_alerts} alert{'s' if critical_alerts != 1 else ''} needing attention"
else:
return "Some items need your attention"
else: # RED
return "Critical issues require immediate action"
async def _get_last_orchestration_run(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Get the most recent orchestration run"""
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.tenant_id == tenant_id)
.where(OrchestrationRun.status.in_([
OrchestrationStatus.completed,
OrchestrationStatus.partial_success
]))
.order_by(desc(OrchestrationRun.started_at))
.limit(1)
)
run = result.scalar_one_or_none()
if not run:
return None
return {
"runId": str(run.id),
"runNumber": run.run_number,
"timestamp": run.started_at.isoformat(),
"duration": run.duration_seconds,
"status": run.status.value
}
async def get_orchestration_summary(
self,
tenant_id: str,
last_run_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Get narrative summary of what the orchestrator did
Args:
tenant_id: Tenant identifier
last_run_id: Optional specific run ID, otherwise gets latest
Returns:
Orchestration summary with narrative format
"""
# Get the orchestration run
if last_run_id:
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.id == last_run_id)
.where(OrchestrationRun.tenant_id == tenant_id)
)
else:
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.tenant_id == tenant_id)
.where(OrchestrationRun.status.in_([
OrchestrationStatus.completed,
OrchestrationStatus.partial_success
]))
.order_by(desc(OrchestrationRun.started_at))
.limit(1)
)
run = result.scalar_one_or_none()
if not run:
return {
"runTimestamp": None,
"purchaseOrdersCreated": 0,
"purchaseOrdersSummary": [],
"productionBatchesCreated": 0,
"productionBatchesSummary": [],
"reasoningInputs": {
"customerOrders": 0,
"historicalDemand": False,
"inventoryLevels": False,
"aiInsights": False
},
"userActionsRequired": 0,
"status": "no_runs",
"message": "No orchestration has been run yet. Click 'Run Daily Planning' to generate your first plan."
}
# Parse results from JSONB
results = run.results or {}
# Extract step results
step_results = results.get("steps", {})
forecasting_step = step_results.get("1", {})
production_step = step_results.get("2", {})
procurement_step = step_results.get("3", {})
# Count created entities
po_count = procurement_step.get("purchase_orders_created", 0)
batch_count = production_step.get("production_batches_created", 0)
# Get detailed summaries (these would come from the actual services in real implementation)
# For now, provide structure that the frontend expects
return {
"runTimestamp": run.started_at.isoformat(),
"runNumber": run.run_number,
"status": run.status.value,
"purchaseOrdersCreated": po_count,
"purchaseOrdersSummary": [], # Will be filled by separate service calls
"productionBatchesCreated": batch_count,
"productionBatchesSummary": [], # Will be filled by separate service calls
"reasoningInputs": {
"customerOrders": forecasting_step.get("orders_analyzed", 0),
"historicalDemand": forecasting_step.get("success", False),
"inventoryLevels": procurement_step.get("success", False),
"aiInsights": results.get("ai_insights_used", False)
},
"userActionsRequired": po_count, # POs need approval
"durationSeconds": run.duration_seconds,
"aiAssisted": results.get("ai_insights_used", False)
}
async def get_action_queue(
self,
tenant_id: str,
pending_pos: List[Dict[str, Any]],
critical_alerts: List[Dict[str, Any]],
onboarding_incomplete: bool,
onboarding_steps: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Build prioritized action queue for user
Args:
tenant_id: Tenant identifier
pending_pos: List of pending purchase orders
critical_alerts: List of critical alerts
onboarding_incomplete: Whether onboarding is incomplete
onboarding_steps: Incomplete onboarding steps
Returns:
Prioritized list of actions
"""
actions = []
# 1. Critical alerts (red) - stock-outs, equipment failures
for alert in critical_alerts:
if alert.get("severity") == "critical":
actions.append({
"id": alert["id"],
"type": ActionType.RESOLVE_ALERT,
"urgency": ActionUrgency.CRITICAL,
"title": alert["title"],
"subtitle": alert.get("source") or "System Alert",
"reasoning": alert.get("description") or "System alert requires attention",
"consequence": "Immediate action required to prevent production issues",
"actions": [
{"label": "View Details", "type": "primary", "action": "view_alert"},
{"label": "Dismiss", "type": "secondary", "action": "dismiss"}
],
"estimatedTimeMinutes": 5
})
# 2. Time-sensitive PO approvals
for po in pending_pos:
# Calculate urgency based on required delivery date
urgency = self._calculate_po_urgency(po)
# Get reasoning_data or create default
reasoning_data = po.get("reasoning_data") or {
"type": "low_stock_detection",
"parameters": {
"supplier_name": po.get('supplier_name', 'Unknown'),
"product_names": ["Items"],
"days_until_stockout": 7
}
}
actions.append({
"id": po["id"],
"type": ActionType.APPROVE_PO,
"urgency": urgency,
"title": f"Purchase Order {po.get('po_number', 'N/A')}",
"subtitle": f"Supplier: {po.get('supplier_name', 'Unknown')}",
"reasoning": f"Pending approval for {po.get('supplier_name', 'supplier')} - {reasoning_data.get('type', 'inventory replenishment')}",
"consequence": "Delayed delivery may impact production schedule",
"amount": po.get("total_amount", 0),
"currency": po.get("currency", "EUR"),
"actions": [
{"label": "Approve", "type": "primary", "action": "approve"},
{"label": "View Details", "type": "secondary", "action": "view_details"},
{"label": "Modify", "type": "tertiary", "action": "modify"}
],
"estimatedTimeMinutes": 2
})
# 3. Incomplete onboarding (blue) - blocks full automation
if onboarding_incomplete:
for step in onboarding_steps:
if not step.get("completed"):
actions.append({
"id": f"onboarding_{step['id']}",
"type": ActionType.COMPLETE_ONBOARDING,
"urgency": ActionUrgency.IMPORTANT,
"title": step.get("title") or "Complete onboarding step",
"subtitle": "Setup incomplete",
"reasoning": "Required to unlock full automation",
"consequence": step.get("consequence") or "Some features are limited",
"actions": [
{"label": "Complete Setup", "type": "primary", "action": "complete_onboarding"}
],
"estimatedTimeMinutes": step.get("estimated_minutes") or 10
})
# Sort by urgency priority
urgency_order = {
ActionUrgency.CRITICAL: 0,
ActionUrgency.IMPORTANT: 1,
ActionUrgency.NORMAL: 2
}
actions.sort(key=lambda x: urgency_order.get(x["urgency"], 3))
return actions
def _calculate_po_urgency(self, po: Dict[str, Any]) -> str:
"""Calculate urgency of PO approval based on delivery date"""
required_date = po.get("required_delivery_date")
if not required_date:
return ActionUrgency.NORMAL
# Parse date if string
if isinstance(required_date, str):
required_date = datetime.fromisoformat(required_date.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
time_until_delivery = required_date - now
# Critical if needed within 24 hours
if time_until_delivery.total_seconds() < 86400: # 24 hours
return ActionUrgency.CRITICAL
# Important if needed within 48 hours
if time_until_delivery.total_seconds() < 172800: # 48 hours
return ActionUrgency.IMPORTANT
return ActionUrgency.NORMAL
async def get_production_timeline(
self,
tenant_id: str,
batches: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Transform production batches into timeline format
Args:
tenant_id: Tenant identifier
batches: List of production batches for today
Returns:
Timeline-formatted production schedule
"""
now = datetime.now(timezone.utc)
timeline = []
for batch in batches:
# Parse times
planned_start = batch.get("planned_start_time")
if isinstance(planned_start, str):
planned_start = datetime.fromisoformat(planned_start.replace('Z', '+00:00'))
planned_end = batch.get("planned_end_time")
if isinstance(planned_end, str):
planned_end = datetime.fromisoformat(planned_end.replace('Z', '+00:00'))
actual_start = batch.get("actual_start_time")
if actual_start and isinstance(actual_start, str):
actual_start = datetime.fromisoformat(actual_start.replace('Z', '+00:00'))
# Determine status and progress
status = batch.get("status", "PENDING")
progress = 0
if status == "COMPLETED":
progress = 100
status_icon = ""
status_text = "COMPLETED"
elif status == "IN_PROGRESS":
# Calculate progress based on time elapsed
if actual_start and planned_end:
total_duration = (planned_end - actual_start).total_seconds()
elapsed = (now - actual_start).total_seconds()
progress = min(int((elapsed / total_duration) * 100), 99)
else:
progress = 50
status_icon = "🔄"
status_text = "IN PROGRESS"
else:
status_icon = ""
status_text = "PENDING"
# Get reasoning_data or create default
reasoning_data = batch.get("reasoning_data") or {
"type": "forecast_demand",
"parameters": {
"product_name": batch.get("product_name", "Product"),
"predicted_demand": batch.get("planned_quantity", 0),
"confidence_score": 85
}
}
timeline.append({
"id": batch["id"],
"batchNumber": batch.get("batch_number"),
"productName": batch.get("product_name"),
"quantity": batch.get("planned_quantity"),
"unit": "units",
"plannedStartTime": planned_start.isoformat() if planned_start else None,
"plannedEndTime": planned_end.isoformat() if planned_end else None,
"actualStartTime": actual_start.isoformat() if actual_start else None,
"status": status,
"statusIcon": status_icon,
"statusText": status_text,
"progress": progress,
"readyBy": planned_end.isoformat() if planned_end else None,
"priority": batch.get("priority", "MEDIUM"),
"reasoning_data": reasoning_data # NEW: Structured data for i18n
})
# Sort by planned start time
timeline.sort(key=lambda x: x["plannedStartTime"] or "9999")
return timeline
async def calculate_insights(
self,
tenant_id: str,
sustainability_data: Dict[str, Any],
inventory_data: Dict[str, Any],
savings_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Calculate key insights for the insights grid
Args:
tenant_id: Tenant identifier
sustainability_data: Waste and sustainability metrics
inventory_data: Inventory status
savings_data: Cost savings data
Returns:
Insights formatted for the grid
"""
# Savings insight
weekly_savings = savings_data.get("weekly_savings", 0)
savings_trend = savings_data.get("trend_percentage", 0)
# Inventory insight
low_stock_count = inventory_data.get("low_stock_count", 0)
out_of_stock_count = inventory_data.get("out_of_stock_count", 0)
if out_of_stock_count > 0:
inventory_status = "⚠️ Stock issues"
inventory_detail = f"{out_of_stock_count} out of stock"
inventory_color = "red"
elif low_stock_count > 0:
inventory_status = "Low stock"
inventory_detail = f"{low_stock_count} alert{'s' if low_stock_count != 1 else ''}"
inventory_color = "amber"
else:
inventory_status = "All stocked"
inventory_detail = "No alerts"
inventory_color = "green"
# Waste insight
waste_percentage = sustainability_data.get("waste_percentage", 0)
waste_target = sustainability_data.get("target_percentage", 5.0)
waste_trend = waste_percentage - waste_target
# Deliveries insight
deliveries_today = inventory_data.get("deliveries_today", 0)
next_delivery = inventory_data.get("next_delivery_time")
return {
"savings": {
"label": "💰 SAVINGS",
"value": f"{weekly_savings:.0f} this week",
"detail": f"+{savings_trend:.0f}% vs. last" if savings_trend > 0 else f"{savings_trend:.0f}% vs. last",
"color": "green" if savings_trend > 0 else "amber"
},
"inventory": {
"label": "📦 INVENTORY",
"value": inventory_status,
"detail": inventory_detail,
"color": inventory_color
},
"waste": {
"label": "♻️ WASTE",
"value": f"{waste_percentage:.1f}% this month",
"detail": f"{waste_trend:+.1f}% vs. goal",
"color": "green" if waste_trend <= 0 else "amber"
},
"deliveries": {
"label": "🚚 DELIVERIES",
"value": f"{deliveries_today} arriving today",
"detail": next_delivery or "None scheduled",
"color": "green"
}
}