Improve the UI and tests

This commit is contained in:
Urtzi Alfaro
2025-11-15 21:21:06 +01:00
parent 86d704b354
commit 54b7a5e080
44 changed files with 2268 additions and 1414 deletions

View File

@@ -11,10 +11,12 @@ from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
import logging
import asyncio
from app.core.database import get_db
from app.core.config import settings
from ..services.dashboard_service import DashboardService
from ..utils.cache import get_cached, set_cached, delete_pattern
from shared.clients import (
get_inventory_client,
get_production_client,
@@ -194,45 +196,59 @@ async def get_bakery_health_status(
or if there are issues requiring attention.
"""
try:
# Try to get from cache
if settings.CACHE_ENABLED:
cache_key = f"dashboard:health:{tenant_id}"
cached = await get_cached(cache_key)
if cached:
return BakeryHealthStatusResponse(**cached)
dashboard_service = DashboardService(db)
# Gather metrics from various services
# In a real implementation, these would be fetched from respective services
# For now, we'll make HTTP calls to the services
# Gather metrics from various services in parallel
# Use asyncio.gather to make all HTTP calls concurrently
# Get alerts summary
try:
alerts_data = await alerts_client.get_alerts_summary(tenant_id) or {}
critical_alerts = alerts_data.get("critical_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
critical_alerts = 0
async def fetch_alerts():
try:
alerts_data = await alerts_client.get_alerts_summary(tenant_id) or {}
return alerts_data.get("critical_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
return 0
# Get pending PO count
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100) or []
pending_approvals = len(po_data) if isinstance(po_data, list) else 0
except Exception as e:
logger.warning(f"Failed to fetch POs: {e}")
pending_approvals = 0
async def fetch_pending_pos():
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100) or []
return len(po_data) if isinstance(po_data, list) else 0
except Exception as e:
logger.warning(f"Failed to fetch POs: {e}")
return 0
# Get production delays
try:
prod_data = await production_client.get_production_batches_by_status(
tenant_id, status="ON_HOLD", limit=100
) or {}
production_delays = len(prod_data.get("batches", []))
except Exception as e:
logger.warning(f"Failed to fetch production batches: {e}")
production_delays = 0
async def fetch_production_delays():
try:
prod_data = await production_client.get_production_batches_by_status(
tenant_id, status="ON_HOLD", limit=100
) or {}
return len(prod_data.get("batches", []))
except Exception as e:
logger.warning(f"Failed to fetch production batches: {e}")
return 0
# Get inventory status
try:
inv_data = await inventory_client.get_inventory_dashboard(tenant_id) or {}
out_of_stock_count = inv_data.get("out_of_stock_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch inventory: {e}")
out_of_stock_count = 0
async def fetch_inventory():
try:
inv_data = await inventory_client.get_inventory_dashboard(tenant_id) or {}
return inv_data.get("out_of_stock_count", 0)
except Exception as e:
logger.warning(f"Failed to fetch inventory: {e}")
return 0
# Execute all fetches in parallel
critical_alerts, pending_approvals, production_delays, out_of_stock_count = await asyncio.gather(
fetch_alerts(),
fetch_pending_pos(),
fetch_production_delays(),
fetch_inventory()
)
# System errors (would come from monitoring system)
system_errors = 0
@@ -247,6 +263,11 @@ async def get_bakery_health_status(
system_errors=system_errors
)
# Cache the result
if settings.CACHE_ENABLED:
cache_key = f"dashboard:health:{tenant_id}"
await set_cached(cache_key, health_status, ttl=settings.CACHE_TTL_HEALTH)
return BakeryHealthStatusResponse(**health_status)
except Exception as e:
@@ -267,6 +288,13 @@ async def get_orchestration_summary(
and why, helping build user trust in the system.
"""
try:
# Try to get from cache (only if no specific run_id is provided)
if settings.CACHE_ENABLED and run_id is None:
cache_key = f"dashboard:summary:{tenant_id}"
cached = await get_cached(cache_key)
if cached:
return OrchestrationSummaryResponse(**cached)
dashboard_service = DashboardService(db)
# Get orchestration summary
@@ -307,6 +335,11 @@ async def get_orchestration_summary(
except Exception as e:
logger.warning(f"Failed to fetch batch details: {e}")
# Cache the result (only if no specific run_id)
if settings.CACHE_ENABLED and run_id is None:
cache_key = f"dashboard:summary:{tenant_id}"
await set_cached(cache_key, summary, ttl=settings.CACHE_TTL_SUMMARY)
return OrchestrationSummaryResponse(**summary)
except Exception as e:
@@ -328,38 +361,52 @@ async def get_action_queue(
try:
dashboard_service = DashboardService(db)
# Fetch data from various services
# Get pending POs
pending_pos = []
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=20)
if po_data and isinstance(po_data, list):
pending_pos = po_data
except Exception as e:
logger.warning(f"Failed to fetch pending POs: {e}")
# Fetch data from various services in parallel
async def fetch_pending_pos():
try:
po_data = await procurement_client.get_pending_purchase_orders(tenant_id, limit=20)
if po_data and isinstance(po_data, list):
return po_data
return []
except Exception as e:
logger.warning(f"Failed to fetch pending POs: {e}")
return []
# Get critical alerts
critical_alerts = []
try:
alerts_data = await alerts_client.get_critical_alerts(tenant_id, limit=20)
if alerts_data:
critical_alerts = alerts_data.get("alerts", [])
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
async def fetch_critical_alerts():
try:
alerts_data = await alerts_client.get_critical_alerts(tenant_id, limit=20)
if alerts_data:
return alerts_data.get("alerts", [])
return []
except Exception as e:
logger.warning(f"Failed to fetch alerts: {e}")
return []
# Get onboarding status
onboarding_incomplete = False
onboarding_steps = []
try:
onboarding_data = await procurement_client.get(
"/procurement/auth/onboarding-progress",
tenant_id=tenant_id
)
if onboarding_data:
onboarding_incomplete = not onboarding_data.get("completed", True)
onboarding_steps = onboarding_data.get("steps", [])
except Exception as e:
logger.warning(f"Failed to fetch onboarding status: {e}")
async def fetch_onboarding():
try:
onboarding_data = await procurement_client.get(
"/procurement/auth/onboarding-progress",
tenant_id=tenant_id
)
if onboarding_data:
return {
"incomplete": not onboarding_data.get("completed", True),
"steps": onboarding_data.get("steps", [])
}
return {"incomplete": False, "steps": []}
except Exception as e:
logger.warning(f"Failed to fetch onboarding status: {e}")
return {"incomplete": False, "steps": []}
# Execute all fetches in parallel
pending_pos, critical_alerts, onboarding = await asyncio.gather(
fetch_pending_pos(),
fetch_critical_alerts(),
fetch_onboarding()
)
onboarding_incomplete = onboarding["incomplete"]
onboarding_steps = onboarding["steps"]
# Build action queue
actions = await dashboard_service.get_action_queue(
@@ -443,93 +490,106 @@ async def get_insights(
Provides glanceable metrics on savings, inventory, waste, and deliveries.
"""
try:
# Try to get from cache
if settings.CACHE_ENABLED:
cache_key = f"dashboard:insights:{tenant_id}"
cached = await get_cached(cache_key)
if cached:
return InsightsResponse(**cached)
dashboard_service = DashboardService(db)
# Fetch data from various services
# Sustainability data
sustainability_data = {}
try:
sustainability_data = await inventory_client.get_sustainability_widget(tenant_id) or {}
except Exception as e:
logger.warning(f"Failed to fetch sustainability data: {e}")
# Fetch data from various services in parallel
from datetime import datetime, timedelta, timezone
# Inventory data
inventory_data = {}
try:
raw_inventory_data = await inventory_client.get_stock_status(tenant_id)
# Handle case where API returns a list instead of dict
if isinstance(raw_inventory_data, dict):
inventory_data = raw_inventory_data
elif isinstance(raw_inventory_data, list):
# If it's a list, aggregate the data
inventory_data = {
"low_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "low_stock"),
"out_of_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "out_of_stock"),
"total_items": len(raw_inventory_data)
}
else:
inventory_data = {}
except Exception as e:
logger.warning(f"Failed to fetch inventory data: {e}")
async def fetch_sustainability():
try:
return await inventory_client.get_sustainability_widget(tenant_id) or {}
except Exception as e:
logger.warning(f"Failed to fetch sustainability data: {e}")
return {}
# Deliveries data from procurement
delivery_data = {}
try:
# Get recent POs with pending deliveries
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100)
if pos_result and isinstance(pos_result, list):
# Count deliveries expected today
from datetime import datetime, timezone
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
today_end = today_start.replace(hour=23, minute=59, second=59)
async def fetch_inventory():
try:
raw_inventory_data = await inventory_client.get_stock_status(tenant_id)
# Handle case where API returns a list instead of dict
if isinstance(raw_inventory_data, dict):
return raw_inventory_data
elif isinstance(raw_inventory_data, list):
# If it's a list, aggregate the data
return {
"low_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "low_stock"),
"out_of_stock_count": sum(1 for item in raw_inventory_data if item.get("status") == "out_of_stock"),
"total_items": len(raw_inventory_data)
}
return {}
except Exception as e:
logger.warning(f"Failed to fetch inventory data: {e}")
return {}
deliveries_today = 0
for po in pos_result:
expected_date = po.get("expected_delivery_date")
if expected_date:
if isinstance(expected_date, str):
expected_date = datetime.fromisoformat(expected_date.replace('Z', '+00:00'))
if today_start <= expected_date <= today_end:
deliveries_today += 1
async def fetch_deliveries():
try:
# Get recent POs with pending deliveries
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=100)
if pos_result and isinstance(pos_result, list):
# Count deliveries expected today
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
today_end = today_start.replace(hour=23, minute=59, second=59)
delivery_data = {"deliveries_today": deliveries_today}
except Exception as e:
logger.warning(f"Failed to fetch delivery data: {e}")
deliveries_today = 0
for po in pos_result:
expected_date = po.get("expected_delivery_date")
if expected_date:
if isinstance(expected_date, str):
expected_date = datetime.fromisoformat(expected_date.replace('Z', '+00:00'))
if today_start <= expected_date <= today_end:
deliveries_today += 1
# Savings data - Calculate from recent PO price optimizations
savings_data = {}
try:
# Get recent POs (last 7 days) and sum up optimization savings
from datetime import datetime, timedelta, timezone
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
return {"deliveries_today": deliveries_today}
return {}
except Exception as e:
logger.warning(f"Failed to fetch delivery data: {e}")
return {}
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=200)
if pos_result and isinstance(pos_result, list):
weekly_savings = 0
# Calculate savings from price optimization
for po in pos_result:
# Check if PO was created in last 7 days
created_at = po.get("created_at")
if created_at:
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_at >= seven_days_ago:
# Sum up savings from optimization
optimization_data = po.get("optimization_data", {})
if isinstance(optimization_data, dict):
savings = optimization_data.get("savings", 0) or 0
weekly_savings += float(savings)
async def fetch_savings():
try:
# Get recent POs (last 7 days) and sum up optimization savings
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
# Default trend percentage (would need historical data for real trend)
savings_data = {
"weekly_savings": round(weekly_savings, 2),
"trend_percentage": 12 if weekly_savings > 0 else 0
}
else:
savings_data = {"weekly_savings": 0, "trend_percentage": 0}
except Exception as e:
logger.warning(f"Failed to calculate savings data: {e}")
savings_data = {"weekly_savings": 0, "trend_percentage": 0}
pos_result = await procurement_client.get_pending_purchase_orders(tenant_id, limit=200)
if pos_result and isinstance(pos_result, list):
weekly_savings = 0
# Calculate savings from price optimization
for po in pos_result:
# Check if PO was created in last 7 days
created_at = po.get("created_at")
if created_at:
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_at >= seven_days_ago:
# Sum up savings from optimization
optimization_data = po.get("optimization_data", {})
if isinstance(optimization_data, dict):
savings = optimization_data.get("savings", 0) or 0
weekly_savings += float(savings)
# Default trend percentage (would need historical data for real trend)
return {
"weekly_savings": round(weekly_savings, 2),
"trend_percentage": 12 if weekly_savings > 0 else 0
}
return {"weekly_savings": 0, "trend_percentage": 0}
except Exception as e:
logger.warning(f"Failed to calculate savings data: {e}")
return {"weekly_savings": 0, "trend_percentage": 0}
# Execute all fetches in parallel
sustainability_data, inventory_data, delivery_data, savings_data = await asyncio.gather(
fetch_sustainability(),
fetch_inventory(),
fetch_deliveries(),
fetch_savings()
)
# Merge delivery data into inventory data
inventory_data.update(delivery_data)
@@ -542,6 +602,19 @@ async def get_insights(
savings_data=savings_data
)
# Prepare response
response_data = {
"savings": insights["savings"],
"inventory": insights["inventory"],
"waste": insights["waste"],
"deliveries": insights["deliveries"]
}
# Cache the result
if settings.CACHE_ENABLED:
cache_key = f"dashboard:insights:{tenant_id}"
await set_cached(cache_key, response_data, ttl=settings.CACHE_TTL_INSIGHTS)
return InsightsResponse(
savings=InsightCard(**insights["savings"]),
inventory=InsightCard(**insights["inventory"]),

View File

@@ -103,6 +103,15 @@ class OrchestratorSettings(BaseServiceSettings):
AI_INSIGHTS_SERVICE_URL: str = os.getenv("AI_INSIGHTS_SERVICE_URL", "http://ai-insights-service:8000")
AI_INSIGHTS_MIN_CONFIDENCE: int = int(os.getenv("AI_INSIGHTS_MIN_CONFIDENCE", "70"))
# Redis Cache Settings (for dashboard performance)
REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
CACHE_ENABLED: bool = os.getenv("CACHE_ENABLED", "true").lower() == "true"
CACHE_TTL_HEALTH: int = int(os.getenv("CACHE_TTL_HEALTH", "30")) # 30 seconds
CACHE_TTL_INSIGHTS: int = int(os.getenv("CACHE_TTL_INSIGHTS", "120")) # 2 minutes
CACHE_TTL_SUMMARY: int = int(os.getenv("CACHE_TTL_SUMMARY", "60")) # 1 minute
# Global settings instance
settings = OrchestratorSettings()

View File

@@ -0,0 +1,219 @@
# services/orchestrator/app/utils/cache.py
"""
Redis caching utilities for dashboard endpoints
"""
import json
import redis.asyncio as redis
from typing import Optional, Any, Callable
from functools import wraps
import structlog
from app.core.config import settings
logger = structlog.get_logger()
# Redis client instance
_redis_client: Optional[redis.Redis] = None
async def get_redis_client() -> redis.Redis:
"""Get or create Redis client"""
global _redis_client
if _redis_client is None:
try:
_redis_client = redis.Redis(
host=getattr(settings, 'REDIS_HOST', 'localhost'),
port=getattr(settings, 'REDIS_PORT', 6379),
db=getattr(settings, 'REDIS_DB', 0),
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# Test connection
await _redis_client.ping()
logger.info("Redis client connected successfully")
except Exception as e:
logger.warning(f"Failed to connect to Redis: {e}. Caching will be disabled.")
_redis_client = None
return _redis_client
async def close_redis():
"""Close Redis connection"""
global _redis_client
if _redis_client:
await _redis_client.close()
_redis_client = None
logger.info("Redis connection closed")
async def get_cached(key: str) -> Optional[Any]:
"""
Get cached value by key
Args:
key: Cache key
Returns:
Cached value (deserialized from JSON) or None if not found or error
"""
try:
client = await get_redis_client()
if not client:
return None
cached = await client.get(key)
if cached:
logger.debug(f"Cache hit: {key}")
return json.loads(cached)
else:
logger.debug(f"Cache miss: {key}")
return None
except Exception as e:
logger.warning(f"Cache get error for key {key}: {e}")
return None
async def set_cached(key: str, value: Any, ttl: int = 60) -> bool:
"""
Set cached value with TTL
Args:
key: Cache key
value: Value to cache (will be JSON serialized)
ttl: Time to live in seconds
Returns:
True if successful, False otherwise
"""
try:
client = await get_redis_client()
if not client:
return False
serialized = json.dumps(value, default=str)
await client.setex(key, ttl, serialized)
logger.debug(f"Cache set: {key} (TTL: {ttl}s)")
return True
except Exception as e:
logger.warning(f"Cache set error for key {key}: {e}")
return False
async def delete_cached(key: str) -> bool:
"""
Delete cached value
Args:
key: Cache key
Returns:
True if successful, False otherwise
"""
try:
client = await get_redis_client()
if not client:
return False
await client.delete(key)
logger.debug(f"Cache deleted: {key}")
return True
except Exception as e:
logger.warning(f"Cache delete error for key {key}: {e}")
return False
async def delete_pattern(pattern: str) -> int:
"""
Delete all keys matching pattern
Args:
pattern: Redis key pattern (e.g., "dashboard:*")
Returns:
Number of keys deleted
"""
try:
client = await get_redis_client()
if not client:
return 0
keys = []
async for key in client.scan_iter(match=pattern):
keys.append(key)
if keys:
deleted = await client.delete(*keys)
logger.info(f"Deleted {deleted} keys matching pattern: {pattern}")
return deleted
return 0
except Exception as e:
logger.warning(f"Cache delete pattern error for {pattern}: {e}")
return 0
def cache_response(key_prefix: str, ttl: int = 60):
"""
Decorator to cache endpoint responses
Args:
key_prefix: Prefix for cache key (will be combined with tenant_id)
ttl: Time to live in seconds
Usage:
@cache_response("dashboard:health", ttl=30)
async def get_health(tenant_id: str):
...
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract tenant_id from kwargs or args
tenant_id = kwargs.get('tenant_id')
if not tenant_id and args:
# Try to find tenant_id in args (assuming it's the first argument)
tenant_id = args[0] if len(args) > 0 else None
if not tenant_id:
# No tenant_id, skip caching
return await func(*args, **kwargs)
# Build cache key
cache_key = f"{key_prefix}:{tenant_id}"
# Try to get from cache
cached_value = await get_cached(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = await func(*args, **kwargs)
# Cache result
await set_cached(cache_key, result, ttl)
return result
return wrapper
return decorator
def make_cache_key(prefix: str, tenant_id: str, **params) -> str:
"""
Create a cache key with optional parameters
Args:
prefix: Key prefix
tenant_id: Tenant ID
**params: Additional parameters to include in key
Returns:
Cache key string
"""
key_parts = [prefix, tenant_id]
for k, v in sorted(params.items()):
if v is not None:
key_parts.append(f"{k}:{v}")
return ":".join(key_parts)