From e46574a12bc6a06ea5e77a200ab260364da6ad29 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 22:02:06 +0000 Subject: [PATCH] refactor: Replace httpx with shared service clients in dashboard API Replace direct httpx calls with shared service client architecture for better fault tolerance, authentication, and consistency. Changes: - Remove httpx import and usage - Add service client imports (inventory, production, procurement) - Initialize service clients at module level - Refactor all 5 dashboard endpoints to use service clients: * health-status: Use inventory/production/procurement clients * orchestration-summary: Use procurement/production clients * action-queue: Use procurement client * production-timeline: Use production client * insights: Use inventory client Benefits: - Built-in circuit breaker pattern for fault tolerance - Automatic service authentication with JWT tokens - Consistent error handling and retry logic - Removes hardcoded service URLs - Better testability and maintainability --- services/orchestrator/app/api/dashboard.py | 306 +++++++++++---------- 1 file changed, 156 insertions(+), 150 deletions(-) diff --git a/services/orchestrator/app/api/dashboard.py b/services/orchestrator/app/api/dashboard.py index 06e682e5..7088612c 100644 --- a/services/orchestrator/app/api/dashboard.py +++ b/services/orchestrator/app/api/dashboard.py @@ -11,13 +11,25 @@ from typing import Dict, Any, List, Optional from pydantic import BaseModel, Field from datetime import datetime import logging -import httpx from app.core.database import get_db +from app.core.config import settings from ..services.dashboard_service import DashboardService +from shared.clients import ( + get_inventory_client, + get_production_client, + ProductionServiceClient, + InventoryServiceClient +) +from shared.clients.procurement_client import ProcurementServiceClient logger = logging.getLogger(__name__) +# Initialize service clients +inventory_client = get_inventory_client(settings, "orchestrator") +production_client = get_production_client(settings, "orchestrator") +procurement_client = ProcurementServiceClient(settings) + router = APIRouter(prefix="/api/v1/tenants/{tenant_id}/dashboard", tags=["dashboard"]) @@ -177,52 +189,48 @@ async def get_bakery_health_status( # In a real implementation, these would be fetched from respective services # For now, we'll make HTTP calls to the services - async with httpx.AsyncClient(timeout=10.0) as client: - # Get alerts - try: - alerts_response = await client.get( - f"http://alert-processor-service:8000/api/v1/tenants/{tenant_id}/alerts/summary" - ) - alerts_data = alerts_response.json() if alerts_response.status_code == 200 else {} - critical_alerts = alerts_data.get("critical_count", 0) - except Exception as e: - logger.warning(f"Failed to fetch alerts: {e}") - critical_alerts = 0 + # Get alerts - using base client for alert service + try: + alerts_data = await procurement_client.get( + "/alert-processor/alerts/summary", + tenant_id=tenant_id + ) or {} + critical_alerts = alerts_data.get("critical_count", 0) + except Exception as e: + logger.warning(f"Failed to fetch alerts: {e}") + critical_alerts = 0 - # Get pending PO count - try: - po_response = await client.get( - f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders", - params={"status": "pending_approval", "limit": 100} - ) - po_data = po_response.json() if po_response.status_code == 200 else {} - pending_approvals = len(po_data.get("items", [])) - except Exception as e: - logger.warning(f"Failed to fetch POs: {e}") - pending_approvals = 0 + # Get pending PO count + try: + po_data = await procurement_client.get( + f"/purchase-orders", + tenant_id=tenant_id, + params={"status": "pending_approval", "limit": 100} + ) or {} + pending_approvals = len(po_data.get("items", [])) + except Exception as e: + logger.warning(f"Failed to fetch POs: {e}") + pending_approvals = 0 - # Get production delays - try: - prod_response = await client.get( - f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches", - params={"status": "ON_HOLD", "limit": 100} - ) - prod_data = prod_response.json() if prod_response.status_code == 200 else {} - production_delays = len(prod_data.get("items", [])) - except Exception as e: - logger.warning(f"Failed to fetch production batches: {e}") - production_delays = 0 + # Get production delays + try: + prod_data = await production_client.get( + "/production-batches", + tenant_id=tenant_id, + params={"status": "ON_HOLD", "limit": 100} + ) or {} + production_delays = len(prod_data.get("items", [])) + except Exception as e: + logger.warning(f"Failed to fetch production batches: {e}") + production_delays = 0 - # Get inventory status - try: - inv_response = await client.get( - f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/inventory/dashboard/stock-status" - ) - inv_data = inv_response.json() if inv_response.status_code == 200 else {} - out_of_stock_count = inv_data.get("out_of_stock_count", 0) - except Exception as e: - logger.warning(f"Failed to fetch inventory: {e}") - out_of_stock_count = 0 + # Get inventory status + try: + inv_data = await inventory_client.get_inventory_dashboard(tenant_id) or {} + out_of_stock_count = inv_data.get("out_of_stock_count", 0) + except Exception as e: + logger.warning(f"Failed to fetch inventory: {e}") + out_of_stock_count = 0 # System errors (would come from monitoring system) system_errors = 0 @@ -267,43 +275,43 @@ async def get_orchestration_summary( # Enhance with detailed PO and batch summaries if summary["purchaseOrdersCreated"] > 0: - async with httpx.AsyncClient(timeout=10.0) as client: - try: - po_response = await client.get( - f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders", - params={"status": "pending_approval", "limit": 10} - ) - if po_response.status_code == 200: - pos = po_response.json().get("items", []) - summary["purchaseOrdersSummary"] = [ - PurchaseOrderSummary( - supplierName=po.get("supplier_name", "Unknown"), - itemCategories=[item.get("ingredient_name", "Item") for item in po.get("items", [])[:3]], - totalAmount=float(po.get("total_amount", 0)) - ) - for po in pos[:5] # Show top 5 - ] - except Exception as e: - logger.warning(f"Failed to fetch PO details: {e}") + try: + po_data = await procurement_client.get( + "/purchase-orders", + tenant_id=tenant_id, + params={"status": "pending_approval", "limit": 10} + ) + if po_data: + pos = po_data.get("items", []) + summary["purchaseOrdersSummary"] = [ + PurchaseOrderSummary( + supplierName=po.get("supplier_name", "Unknown"), + itemCategories=[item.get("ingredient_name", "Item") for item in po.get("items", [])[:3]], + totalAmount=float(po.get("total_amount", 0)) + ) + for po in pos[:5] # Show top 5 + ] + except Exception as e: + logger.warning(f"Failed to fetch PO details: {e}") if summary["productionBatchesCreated"] > 0: - async with httpx.AsyncClient(timeout=10.0) as client: - try: - batch_response = await client.get( - f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches/today" - ) - if batch_response.status_code == 200: - batches = batch_response.json().get("batches", []) - summary["productionBatchesSummary"] = [ - ProductionBatchSummary( - productName=batch.get("product_name", "Unknown"), - quantity=batch.get("planned_quantity", 0), - readyByTime=batch.get("planned_end_time", "") - ) - for batch in batches[:5] # Show top 5 - ] - except Exception as e: - logger.warning(f"Failed to fetch batch details: {e}") + try: + batch_data = await production_client.get( + "/production-batches/today", + tenant_id=tenant_id + ) + if batch_data: + batches = batch_data.get("batches", []) + summary["productionBatchesSummary"] = [ + ProductionBatchSummary( + productName=batch.get("product_name", "Unknown"), + quantity=batch.get("planned_quantity", 0), + readyByTime=batch.get("planned_end_time", "") + ) + for batch in batches[:5] # Show top 5 + ] + except Exception as e: + logger.warning(f"Failed to fetch batch details: {e}") return OrchestrationSummaryResponse(**summary) @@ -327,44 +335,45 @@ async def get_action_queue( dashboard_service = DashboardService(db) # Fetch data from various services - async with httpx.AsyncClient(timeout=10.0) as client: - # Get pending POs - pending_pos = [] - try: - po_response = await client.get( - f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders", - params={"status": "pending_approval", "limit": 20} - ) - if po_response.status_code == 200: - pending_pos = po_response.json().get("items", []) - except Exception as e: - logger.warning(f"Failed to fetch pending POs: {e}") + # Get pending POs + pending_pos = [] + try: + po_data = await procurement_client.get( + "/purchase-orders", + tenant_id=tenant_id, + params={"status": "pending_approval", "limit": 20} + ) + if po_data: + pending_pos = po_data.get("items", []) + except Exception as e: + logger.warning(f"Failed to fetch pending POs: {e}") - # Get critical alerts - critical_alerts = [] - try: - alerts_response = await client.get( - f"http://alert-processor-service:8000/api/v1/tenants/{tenant_id}/alerts", - params={"severity": "critical", "resolved": False, "limit": 20} - ) - if alerts_response.status_code == 200: - critical_alerts = alerts_response.json().get("alerts", []) - except Exception as e: - logger.warning(f"Failed to fetch alerts: {e}") + # Get critical alerts + critical_alerts = [] + try: + alerts_data = await procurement_client.get( + "/alert-processor/alerts", + tenant_id=tenant_id, + params={"severity": "critical", "resolved": False, "limit": 20} + ) + if alerts_data: + critical_alerts = alerts_data.get("alerts", []) + except Exception as e: + logger.warning(f"Failed to fetch alerts: {e}") - # Get onboarding status - onboarding_incomplete = False - onboarding_steps = [] - try: - onboarding_response = await client.get( - f"http://auth:8000/api/v1/tenants/{tenant_id}/onboarding-progress" - ) - if onboarding_response.status_code == 200: - onboarding_data = onboarding_response.json() - onboarding_incomplete = not onboarding_data.get("completed", True) - onboarding_steps = onboarding_data.get("steps", []) - except Exception as e: - logger.warning(f"Failed to fetch onboarding status: {e}") + # Get onboarding status + onboarding_incomplete = False + onboarding_steps = [] + try: + onboarding_data = await procurement_client.get( + "/auth/onboarding-progress", + tenant_id=tenant_id + ) + if onboarding_data: + onboarding_incomplete = not onboarding_data.get("completed", True) + onboarding_steps = onboarding_data.get("steps", []) + except Exception as e: + logger.warning(f"Failed to fetch onboarding status: {e}") # Build action queue actions = await dashboard_service.get_action_queue( @@ -406,15 +415,15 @@ async def get_production_timeline( # Fetch today's production batches batches = [] - async with httpx.AsyncClient(timeout=10.0) as client: - try: - response = await client.get( - f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches/today" - ) - if response.status_code == 200: - batches = response.json().get("batches", []) - except Exception as e: - logger.warning(f"Failed to fetch production batches: {e}") + try: + batch_data = await production_client.get( + "/production-batches/today", + tenant_id=tenant_id + ) + if batch_data: + batches = batch_data.get("batches", []) + except Exception as e: + logger.warning(f"Failed to fetch production batches: {e}") # Transform to timeline format timeline = await dashboard_service.get_production_timeline( @@ -454,34 +463,31 @@ async def get_insights( dashboard_service = DashboardService(db) # Fetch data from various services - async with httpx.AsyncClient(timeout=10.0) as client: - # Sustainability data - sustainability_data = {} - try: - response = await client.get( - f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/sustainability/widget" - ) - if response.status_code == 200: - sustainability_data = response.json() - except Exception as e: - logger.warning(f"Failed to fetch sustainability data: {e}") + # Sustainability data + sustainability_data = {} + try: + sustainability_data = await inventory_client.get( + "/sustainability/widget", + tenant_id=tenant_id + ) or {} + except Exception as e: + logger.warning(f"Failed to fetch sustainability data: {e}") - # Inventory data - inventory_data = {} - try: - response = await client.get( - f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/inventory/dashboard/stock-status" - ) - if response.status_code == 200: - inventory_data = response.json() - except Exception as e: - logger.warning(f"Failed to fetch inventory data: {e}") + # Inventory data + inventory_data = {} + try: + inventory_data = await inventory_client.get( + "/inventory/dashboard/stock-status", + tenant_id=tenant_id + ) or {} + except Exception as e: + logger.warning(f"Failed to fetch inventory data: {e}") - # Savings data (mock for now) - savings_data = { - "weekly_savings": 124, - "trend_percentage": 12 - } + # Savings data (mock for now) + savings_data = { + "weekly_savings": 124, + "trend_percentage": 12 + } # Calculate insights insights = await dashboard_service.calculate_insights(