Files
bakery-ia/services/orchestrator/app/services/dashboard_service.py
Claude 298be127d7 Fix template variable interpolation by creating params copy
Root cause: params = reasoning_data.get('parameters', {}) created a reference
to the dictionary instead of a copy. When modifying params to add
product_names_joined, the change didn't persist because the database object
was immutable/read-only.

Changes:
- dashboard_service.py:408 - Create dict copy for PO params
- dashboard_service.py:632 - Create dict copy for batch params
- Added clean_old_dashboard_data.py utility script to remove old POs/batches
  with malformed reasoning_data

The fix ensures template variables like {{supplier_name}}, {{product_names_joined}},
{{days_until_stockout}}, etc. are properly interpolated in the dashboard.
2025-11-20 19:30:12 +00:00

770 lines
30 KiB
Python

# ================================================================
# services/orchestrator/app/services/dashboard_service.py
# ================================================================
"""
Bakery Dashboard Service - JTBD-Aligned Dashboard Data Aggregation
Provides health status, action queue, and orchestration summaries
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional, Tuple
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc
import logging
from ..models.orchestration_run import OrchestrationRun, OrchestrationStatus
logger = logging.getLogger(__name__)
class HealthStatus:
"""Bakery health status enumeration"""
GREEN = "green" # All good, no actions needed
YELLOW = "yellow" # Needs attention, 1-3 actions
RED = "red" # Critical issue, immediate intervention
class ActionType:
"""Types of actions that require user attention"""
APPROVE_PO = "approve_po"
RESOLVE_ALERT = "resolve_alert"
ADJUST_PRODUCTION = "adjust_production"
COMPLETE_ONBOARDING = "complete_onboarding"
REVIEW_OUTDATED_DATA = "review_outdated_data"
class ActionUrgency:
"""Action urgency levels"""
CRITICAL = "critical" # Must do now
IMPORTANT = "important" # Should do today
NORMAL = "normal" # Can do when convenient
class DashboardService:
"""
Aggregates data from multiple services to provide JTBD-aligned dashboard
"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_bakery_health_status(
self,
tenant_id: str,
critical_alerts: int,
pending_approvals: int,
production_delays: int,
out_of_stock_count: int,
system_errors: int
) -> Dict[str, Any]:
"""
Calculate overall bakery health status based on multiple signals
Args:
tenant_id: Tenant identifier
critical_alerts: Number of critical alerts
pending_approvals: Number of pending PO approvals
production_delays: Number of delayed production batches
out_of_stock_count: Number of out-of-stock ingredients
system_errors: Number of system errors
Returns:
Health status with headline and checklist
"""
# Determine overall status
status = self._calculate_health_status(
critical_alerts=critical_alerts,
pending_approvals=pending_approvals,
production_delays=production_delays,
out_of_stock_count=out_of_stock_count,
system_errors=system_errors
)
# Get last orchestration run
last_run = await self._get_last_orchestration_run(tenant_id)
# Generate checklist items
checklist_items = []
# Production status
if production_delays == 0:
checklist_items.append({
"icon": "check",
"textKey": "dashboard.health.production_on_schedule",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "warning",
"textKey": "dashboard.health.production_delayed",
"textParams": {"count": production_delays},
"actionRequired": True
})
# Inventory status
if out_of_stock_count == 0:
checklist_items.append({
"icon": "check",
"textKey": "dashboard.health.all_ingredients_in_stock",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "alert",
"textKey": "dashboard.health.ingredients_out_of_stock",
"textParams": {"count": out_of_stock_count},
"actionRequired": True
})
# Approval status
if pending_approvals == 0:
checklist_items.append({
"icon": "check",
"textKey": "dashboard.health.no_pending_approvals",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "warning",
"textKey": "dashboard.health.approvals_awaiting",
"textParams": {"count": pending_approvals},
"actionRequired": True
})
# System health
if system_errors == 0 and critical_alerts == 0:
checklist_items.append({
"icon": "check",
"textKey": "dashboard.health.all_systems_operational",
"actionRequired": False
})
else:
checklist_items.append({
"icon": "alert",
"textKey": "dashboard.health.critical_issues",
"textParams": {"count": critical_alerts + system_errors},
"actionRequired": True
})
# Generate headline
headline = self._generate_health_headline(status, critical_alerts, pending_approvals)
# Calculate next scheduled run (5:30 AM next day)
now = datetime.now(timezone.utc)
next_run = now.replace(hour=5, minute=30, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
return {
"status": status,
"headline": headline,
"lastOrchestrationRun": last_run["timestamp"] if last_run else None,
"nextScheduledRun": next_run.isoformat(),
"checklistItems": checklist_items,
"criticalIssues": critical_alerts + system_errors,
"pendingActions": pending_approvals + production_delays + out_of_stock_count
}
def _calculate_health_status(
self,
critical_alerts: int,
pending_approvals: int,
production_delays: int,
out_of_stock_count: int,
system_errors: int
) -> str:
"""Calculate overall health status"""
# RED: Critical issues that need immediate attention
if (critical_alerts >= 3 or
out_of_stock_count > 0 or
system_errors > 0 or
production_delays > 2):
return HealthStatus.RED
# YELLOW: Some issues but not urgent
if (critical_alerts > 0 or
pending_approvals > 0 or
production_delays > 0):
return HealthStatus.YELLOW
# GREEN: All good
return HealthStatus.GREEN
def _generate_health_headline(
self,
status: str,
critical_alerts: int,
pending_approvals: int
) -> Dict[str, Any]:
"""Generate i18n-ready headline based on status"""
if status == HealthStatus.GREEN:
return {
"key": "health.headline_green",
"params": {}
}
elif status == HealthStatus.YELLOW:
if pending_approvals > 0:
return {
"key": "health.headline_yellow_approvals",
"params": {"count": pending_approvals}
}
elif critical_alerts > 0:
return {
"key": "health.headline_yellow_alerts",
"params": {"count": critical_alerts}
}
else:
return {
"key": "health.headline_yellow_general",
"params": {}
}
else: # RED
return {
"key": "health.headline_red",
"params": {}
}
async def _get_last_orchestration_run(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Get the most recent orchestration run"""
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.tenant_id == tenant_id)
.where(OrchestrationRun.status.in_([
OrchestrationStatus.completed,
OrchestrationStatus.partial_success
]))
.order_by(desc(OrchestrationRun.started_at))
.limit(1)
)
run = result.scalar_one_or_none()
if not run:
return None
return {
"runId": str(run.id),
"runNumber": run.run_number,
"timestamp": run.started_at.isoformat(),
"duration": run.duration_seconds,
"status": run.status.value
}
async def get_orchestration_summary(
self,
tenant_id: str,
last_run_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Get narrative summary of what the orchestrator did
Args:
tenant_id: Tenant identifier
last_run_id: Optional specific run ID, otherwise gets latest
Returns:
Orchestration summary with narrative format
"""
# Get the orchestration run
if last_run_id:
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.id == last_run_id)
.where(OrchestrationRun.tenant_id == tenant_id)
)
else:
result = await self.db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.tenant_id == tenant_id)
.where(OrchestrationRun.status.in_([
OrchestrationStatus.completed,
OrchestrationStatus.partial_success
]))
.order_by(desc(OrchestrationRun.started_at))
.limit(1)
)
run = result.scalar_one_or_none()
if not run:
return {
"runTimestamp": None,
"purchaseOrdersCreated": 0,
"purchaseOrdersSummary": [],
"productionBatchesCreated": 0,
"productionBatchesSummary": [],
"reasoningInputs": {
"customerOrders": 0,
"historicalDemand": False,
"inventoryLevels": False,
"aiInsights": False
},
"userActionsRequired": 0,
"status": "no_runs",
"message_i18n": {
"key": "orchestration.no_runs_message",
"params": {}
}
}
# Use actual model columns instead of non-existent results attribute
po_count = run.purchase_orders_created or 0
batch_count = run.production_batches_created or 0
forecasts_count = run.forecasts_generated or 0
# Get metadata if available
run_metadata = run.run_metadata or {}
# Extract forecast data if available
forecast_data = run.forecast_data or {}
# Get detailed summaries (these would come from the actual services in real implementation)
# For now, provide structure that the frontend expects
return {
"runTimestamp": run.started_at.isoformat(),
"runNumber": run.run_number,
"status": run.status.value,
"purchaseOrdersCreated": po_count,
"purchaseOrdersSummary": [], # Will be filled by separate service calls
"productionBatchesCreated": batch_count,
"productionBatchesSummary": [], # Will be filled by separate service calls
"reasoningInputs": {
"customerOrders": forecasts_count,
"historicalDemand": run.forecasting_status == "success",
"inventoryLevels": run.procurement_status == "success",
"aiInsights": (run.ai_insights_generated or 0) > 0
},
"userActionsRequired": po_count, # POs need approval
"durationSeconds": run.duration_seconds,
"aiAssisted": (run.ai_insights_generated or 0) > 0
}
async def get_action_queue(
self,
tenant_id: str,
pending_pos: List[Dict[str, Any]],
critical_alerts: List[Dict[str, Any]],
onboarding_incomplete: bool,
onboarding_steps: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Build prioritized action queue for user
Args:
tenant_id: Tenant identifier
pending_pos: List of pending purchase orders
critical_alerts: List of critical alerts
onboarding_incomplete: Whether onboarding is incomplete
onboarding_steps: Incomplete onboarding steps
Returns:
Prioritized list of actions
"""
actions = []
# 1. Critical alerts (red) - stock-outs, equipment failures
for alert in critical_alerts:
if alert.get("severity") == "critical":
actions.append({
"id": alert["id"],
"type": ActionType.RESOLVE_ALERT,
"urgency": ActionUrgency.CRITICAL,
"title": alert["title"],
"subtitle": alert.get("source") or "System Alert",
"reasoning": alert.get("description") or "System alert requires attention",
"consequence_i18n": {
"key": "action_queue.consequences.immediate_action",
"params": {}
},
"actions": [
{"label_i18n": {"key": "action_queue.buttons.view_details", "params": {}}, "type": "primary", "action": "view_alert"},
{"label_i18n": {"key": "action_queue.buttons.dismiss", "params": {}}, "type": "secondary", "action": "dismiss"}
],
"estimatedTimeMinutes": 5
})
# 2. Time-sensitive PO approvals
for po in pending_pos:
# Calculate urgency based on required delivery date
urgency = self._calculate_po_urgency(po)
# Get reasoning_data or create default
reasoning_data = po.get("reasoning_data") or {
"type": "low_stock_detection",
"parameters": {
"supplier_name": po.get('supplier_name', 'Unknown'),
"product_names": ["Items"],
"days_until_stockout": 7
}
}
# Get reasoning type and convert to i18n key
reasoning_type = reasoning_data.get('type', 'inventory_replenishment')
reasoning_type_i18n_key = self._get_reasoning_type_i18n_key(reasoning_type, context="purchaseOrder")
# Preprocess parameters for i18n - MUST create a copy to avoid modifying immutable database objects
params = dict(reasoning_data.get('parameters', {}))
# Convert product_names array to product_names_joined string
if 'product_names' in params and isinstance(params['product_names'], list):
params['product_names_joined'] = ', '.join(params['product_names'])
actions.append({
"id": po["id"],
"type": ActionType.APPROVE_PO,
"urgency": urgency,
"title_i18n": {
"key": "action_queue.titles.purchase_order",
"params": {"po_number": po.get('po_number', 'N/A')}
},
"subtitle_i18n": {
"key": "action_queue.titles.supplier",
"params": {"supplier_name": po.get('supplier_name', 'Unknown')}
},
"reasoning_i18n": {
"key": reasoning_type_i18n_key,
"params": params
},
"consequence_i18n": {
"key": "action_queue.consequences.delayed_delivery",
"params": {}
},
"amount": po.get("total_amount", 0),
"currency": po.get("currency", "EUR"),
"actions": [
{"label_i18n": {"key": "action_queue.buttons.approve", "params": {}}, "type": "primary", "action": "approve"},
{"label_i18n": {"key": "action_queue.buttons.view_details", "params": {}}, "type": "secondary", "action": "view_details"},
{"label_i18n": {"key": "action_queue.buttons.modify", "params": {}}, "type": "tertiary", "action": "modify"}
],
"estimatedTimeMinutes": 2
})
# 3. Incomplete onboarding (blue) - blocks full automation
if onboarding_incomplete:
for step in onboarding_steps:
if not step.get("completed"):
actions.append({
"id": f"onboarding_{step['id']}",
"type": ActionType.COMPLETE_ONBOARDING,
"urgency": ActionUrgency.IMPORTANT,
"title": step.get("title") or "Complete onboarding step",
"subtitle": "Setup incomplete",
"reasoning": "Required to unlock full automation",
"consequence_i18n": step.get("consequence_i18n") or {
"key": "action_queue.consequences.limited_features",
"params": {}
},
"actions": [
{"label_i18n": {"key": "action_queue.buttons.complete_setup", "params": {}}, "type": "primary", "action": "complete_onboarding"}
],
"estimatedTimeMinutes": step.get("estimated_minutes") or 10
})
# Sort by urgency priority
urgency_order = {
ActionUrgency.CRITICAL: 0,
ActionUrgency.IMPORTANT: 1,
ActionUrgency.NORMAL: 2
}
actions.sort(key=lambda x: urgency_order.get(x["urgency"], 3))
return actions
def _get_reasoning_type_i18n_key(self, reasoning_type: str, context: str = "purchaseOrder") -> str:
"""Map reasoning type identifiers to i18n keys
Args:
reasoning_type: The type of reasoning (e.g., "low_stock_detection")
context: The context (either "purchaseOrder" or "productionBatch")
Returns:
Full i18n key with namespace and context prefix
"""
if context == "productionBatch":
reasoning_type_map = {
"forecast_demand": "reasoning.productionBatch.forecast_demand",
"customer_order": "reasoning.productionBatch.customer_order",
"stock_replenishment": "reasoning.productionBatch.stock_replenishment",
"seasonal_preparation": "reasoning.productionBatch.seasonal_preparation",
"promotion_event": "reasoning.productionBatch.promotion_event",
"urgent_order": "reasoning.productionBatch.urgent_order",
"regular_schedule": "reasoning.productionBatch.regular_schedule",
}
else: # purchaseOrder context
reasoning_type_map = {
"low_stock_detection": "reasoning.purchaseOrder.low_stock_detection",
"stockout_prevention": "reasoning.purchaseOrder.low_stock_detection",
"forecast_demand": "reasoning.purchaseOrder.forecast_demand",
"customer_orders": "reasoning.purchaseOrder.forecast_demand",
"seasonal_demand": "reasoning.purchaseOrder.seasonal_demand",
"inventory_replenishment": "reasoning.purchaseOrder.safety_stock_replenishment",
"production_schedule": "reasoning.purchaseOrder.production_requirement",
"supplier_contract": "reasoning.purchaseOrder.supplier_contract",
"safety_stock_replenishment": "reasoning.purchaseOrder.safety_stock_replenishment",
"production_requirement": "reasoning.purchaseOrder.production_requirement",
"manual_request": "reasoning.purchaseOrder.manual_request",
}
return reasoning_type_map.get(reasoning_type, f"reasoning.{context}.forecast_demand")
def _calculate_po_urgency(self, po: Dict[str, Any]) -> str:
"""Calculate urgency of PO approval based on delivery date"""
required_date = po.get("required_delivery_date")
if not required_date:
return ActionUrgency.NORMAL
# Parse date if string
if isinstance(required_date, str):
required_date = datetime.fromisoformat(required_date.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
time_until_delivery = required_date - now
# Critical if needed within 24 hours
if time_until_delivery.total_seconds() < 86400: # 24 hours
return ActionUrgency.CRITICAL
# Important if needed within 48 hours
if time_until_delivery.total_seconds() < 172800: # 48 hours
return ActionUrgency.IMPORTANT
return ActionUrgency.NORMAL
async def get_production_timeline(
self,
tenant_id: str,
batches: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Transform production batches into timeline format
Args:
tenant_id: Tenant identifier
batches: List of production batches for today
Returns:
Timeline-formatted production schedule
"""
now = datetime.now(timezone.utc)
timeline = []
for batch in batches:
# Parse times
planned_start = batch.get("planned_start_time")
if isinstance(planned_start, str):
planned_start = datetime.fromisoformat(planned_start.replace('Z', '+00:00'))
planned_end = batch.get("planned_end_time")
if isinstance(planned_end, str):
planned_end = datetime.fromisoformat(planned_end.replace('Z', '+00:00'))
actual_start = batch.get("actual_start_time")
if actual_start and isinstance(actual_start, str):
actual_start = datetime.fromisoformat(actual_start.replace('Z', '+00:00'))
# Determine status and progress
status = batch.get("status", "PENDING")
progress = 0
if status == "COMPLETED":
progress = 100
status_icon = ""
status_text = "COMPLETED"
status_i18n = {
"key": "production.status.completed",
"params": {}
}
elif status == "IN_PROGRESS":
# Calculate progress based on time elapsed
if actual_start and planned_end:
total_duration = (planned_end - actual_start).total_seconds()
elapsed = (now - actual_start).total_seconds()
progress = min(int((elapsed / total_duration) * 100), 99)
else:
progress = 50
status_icon = "🔄"
status_text = "IN PROGRESS"
status_i18n = {
"key": "production.status.in_progress",
"params": {}
}
else:
status_icon = ""
status_text = "PENDING"
status_i18n = {
"key": "production.status.pending",
"params": {}
}
# Get reasoning_data or create default
reasoning_data = batch.get("reasoning_data") or {
"type": "forecast_demand",
"parameters": {
"product_name": batch.get("product_name", "Product"),
"predicted_demand": batch.get("planned_quantity", 0),
"current_stock": 0, # Default to 0 if not available
"confidence_score": 85
}
}
# Get reasoning type and convert to i18n key
reasoning_type = reasoning_data.get('type', 'forecast_demand')
reasoning_type_i18n_key = self._get_reasoning_type_i18n_key(reasoning_type, context="productionBatch")
timeline.append({
"id": batch["id"],
"batchNumber": batch.get("batch_number"),
"productName": batch.get("product_name"),
"quantity": batch.get("planned_quantity"),
"unit": "units",
"plannedStartTime": planned_start.isoformat() if planned_start else None,
"plannedEndTime": planned_end.isoformat() if planned_end else None,
"actualStartTime": actual_start.isoformat() if actual_start else None,
"status": status,
"statusIcon": status_icon,
"statusText": status_text,
"progress": progress,
"readyBy": planned_end.isoformat() if planned_end else None,
"priority": batch.get("priority", "MEDIUM"),
"reasoning_data": reasoning_data, # Structured data for i18n
"reasoning_i18n": {
"key": reasoning_type_i18n_key,
"params": dict(reasoning_data.get('parameters', {})) # Create a copy to avoid immutable object issues
},
"status_i18n": status_i18n # i18n for status text
})
# Sort by planned start time
timeline.sort(key=lambda x: x["plannedStartTime"] or "9999")
return timeline
async def calculate_insights(
self,
tenant_id: str,
sustainability_data: Dict[str, Any],
inventory_data: Dict[str, Any],
savings_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Calculate key insights for the insights grid
Args:
tenant_id: Tenant identifier
sustainability_data: Waste and sustainability metrics
inventory_data: Inventory status
savings_data: Cost savings data
Returns:
Insights formatted for the grid
"""
# Savings insight
weekly_savings = savings_data.get("weekly_savings", 0)
savings_trend = savings_data.get("trend_percentage", 0)
# Inventory insight
low_stock_count = inventory_data.get("low_stock_count", 0)
out_of_stock_count = inventory_data.get("out_of_stock_count", 0)
# Determine inventory color
if out_of_stock_count > 0:
inventory_color = "red"
elif low_stock_count > 0:
inventory_color = "amber"
else:
inventory_color = "green"
# Create i18n objects for inventory data
inventory_i18n = {
"status_key": "insights.inventory.stock_issues" if out_of_stock_count > 0 else
"insights.inventory.low_stock" if low_stock_count > 0 else
"insights.inventory.all_stocked",
"status_params": {"count": out_of_stock_count} if out_of_stock_count > 0 else
{"count": low_stock_count} if low_stock_count > 0 else {},
"detail_key": "insights.inventory.out_of_stock" if out_of_stock_count > 0 else
"insights.inventory.alerts" if low_stock_count > 0 else
"insights.inventory.no_alerts",
"detail_params": {"count": out_of_stock_count} if out_of_stock_count > 0 else
{"count": low_stock_count} if low_stock_count > 0 else {}
}
# Waste insight
waste_percentage = sustainability_data.get("waste_percentage", 0)
waste_target = sustainability_data.get("target_percentage", 5.0)
waste_trend = waste_percentage - waste_target
# Deliveries insight
deliveries_today = inventory_data.get("deliveries_today", 0)
next_delivery = inventory_data.get("next_delivery_time")
return {
"savings": {
"color": "green" if savings_trend > 0 else "amber",
"i18n": {
"label": {
"key": "insights.savings.label",
"params": {}
},
"value": {
"key": "insights.savings.value_this_week",
"params": {"amount": f"{weekly_savings:.0f}"}
},
"detail": {
"key": "insights.savings.detail_vs_last_positive" if savings_trend > 0 else "insights.savings.detail_vs_last_negative",
"params": {"percentage": f"{abs(savings_trend):.0f}"}
}
}
},
"inventory": {
"color": inventory_color,
"i18n": {
"label": {
"key": "insights.inventory.label",
"params": {}
},
"value": {
"key": inventory_i18n["status_key"],
"params": inventory_i18n["status_params"]
},
"detail": {
"key": inventory_i18n["detail_key"],
"params": inventory_i18n["detail_params"]
}
}
},
"waste": {
"color": "green" if waste_trend <= 0 else "amber",
"i18n": {
"label": {
"key": "insights.waste.label",
"params": {}
},
"value": {
"key": "insights.waste.value_this_month",
"params": {"percentage": f"{waste_percentage:.1f}"}
},
"detail": {
"key": "insights.waste.detail_vs_goal",
"params": {"change": f"{waste_trend:+.1f}"}
}
}
},
"deliveries": {
"color": "green",
"i18n": {
"label": {
"key": "insights.deliveries.label",
"params": {}
},
"value": {
"key": "insights.deliveries.arriving_today",
"params": {"count": deliveries_today}
},
"detail": {
"key": "insights.deliveries.none_scheduled" if not next_delivery else None,
"params": {}
} if not next_delivery else {"key": None, "params": {"next_delivery": next_delivery}}
}
}
}