refactor: Replace httpx with shared service clients in dashboard API
Replace direct httpx calls with shared service client architecture for better fault tolerance, authentication, and consistency. Changes: - Remove httpx import and usage - Add service client imports (inventory, production, procurement) - Initialize service clients at module level - Refactor all 5 dashboard endpoints to use service clients: * health-status: Use inventory/production/procurement clients * orchestration-summary: Use procurement/production clients * action-queue: Use procurement client * production-timeline: Use production client * insights: Use inventory client Benefits: - Built-in circuit breaker pattern for fault tolerance - Automatic service authentication with JWT tokens - Consistent error handling and retry logic - Removes hardcoded service URLs - Better testability and maintainability
This commit is contained in:
@@ -11,13 +11,25 @@ from typing import Dict, Any, List, Optional
|
|||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
import httpx
|
|
||||||
|
|
||||||
from app.core.database import get_db
|
from app.core.database import get_db
|
||||||
|
from app.core.config import settings
|
||||||
from ..services.dashboard_service import DashboardService
|
from ..services.dashboard_service import DashboardService
|
||||||
|
from shared.clients import (
|
||||||
|
get_inventory_client,
|
||||||
|
get_production_client,
|
||||||
|
ProductionServiceClient,
|
||||||
|
InventoryServiceClient
|
||||||
|
)
|
||||||
|
from shared.clients.procurement_client import ProcurementServiceClient
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Initialize service clients
|
||||||
|
inventory_client = get_inventory_client(settings, "orchestrator")
|
||||||
|
production_client = get_production_client(settings, "orchestrator")
|
||||||
|
procurement_client = ProcurementServiceClient(settings)
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/v1/tenants/{tenant_id}/dashboard", tags=["dashboard"])
|
router = APIRouter(prefix="/api/v1/tenants/{tenant_id}/dashboard", tags=["dashboard"])
|
||||||
|
|
||||||
|
|
||||||
@@ -177,52 +189,48 @@ async def get_bakery_health_status(
|
|||||||
# In a real implementation, these would be fetched from respective services
|
# In a real implementation, these would be fetched from respective services
|
||||||
# For now, we'll make HTTP calls to the services
|
# For now, we'll make HTTP calls to the services
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
# Get alerts - using base client for alert service
|
||||||
# Get alerts
|
try:
|
||||||
try:
|
alerts_data = await procurement_client.get(
|
||||||
alerts_response = await client.get(
|
"/alert-processor/alerts/summary",
|
||||||
f"http://alert-processor-service:8000/api/v1/tenants/{tenant_id}/alerts/summary"
|
tenant_id=tenant_id
|
||||||
)
|
) or {}
|
||||||
alerts_data = alerts_response.json() if alerts_response.status_code == 200 else {}
|
critical_alerts = alerts_data.get("critical_count", 0)
|
||||||
critical_alerts = alerts_data.get("critical_count", 0)
|
except Exception as e:
|
||||||
except Exception as e:
|
logger.warning(f"Failed to fetch alerts: {e}")
|
||||||
logger.warning(f"Failed to fetch alerts: {e}")
|
critical_alerts = 0
|
||||||
critical_alerts = 0
|
|
||||||
|
|
||||||
# Get pending PO count
|
# Get pending PO count
|
||||||
try:
|
try:
|
||||||
po_response = await client.get(
|
po_data = await procurement_client.get(
|
||||||
f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders",
|
f"/purchase-orders",
|
||||||
params={"status": "pending_approval", "limit": 100}
|
tenant_id=tenant_id,
|
||||||
)
|
params={"status": "pending_approval", "limit": 100}
|
||||||
po_data = po_response.json() if po_response.status_code == 200 else {}
|
) or {}
|
||||||
pending_approvals = len(po_data.get("items", []))
|
pending_approvals = len(po_data.get("items", []))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch POs: {e}")
|
logger.warning(f"Failed to fetch POs: {e}")
|
||||||
pending_approvals = 0
|
pending_approvals = 0
|
||||||
|
|
||||||
# Get production delays
|
# Get production delays
|
||||||
try:
|
try:
|
||||||
prod_response = await client.get(
|
prod_data = await production_client.get(
|
||||||
f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches",
|
"/production-batches",
|
||||||
params={"status": "ON_HOLD", "limit": 100}
|
tenant_id=tenant_id,
|
||||||
)
|
params={"status": "ON_HOLD", "limit": 100}
|
||||||
prod_data = prod_response.json() if prod_response.status_code == 200 else {}
|
) or {}
|
||||||
production_delays = len(prod_data.get("items", []))
|
production_delays = len(prod_data.get("items", []))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch production batches: {e}")
|
logger.warning(f"Failed to fetch production batches: {e}")
|
||||||
production_delays = 0
|
production_delays = 0
|
||||||
|
|
||||||
# Get inventory status
|
# Get inventory status
|
||||||
try:
|
try:
|
||||||
inv_response = await client.get(
|
inv_data = await inventory_client.get_inventory_dashboard(tenant_id) or {}
|
||||||
f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/inventory/dashboard/stock-status"
|
out_of_stock_count = inv_data.get("out_of_stock_count", 0)
|
||||||
)
|
except Exception as e:
|
||||||
inv_data = inv_response.json() if inv_response.status_code == 200 else {}
|
logger.warning(f"Failed to fetch inventory: {e}")
|
||||||
out_of_stock_count = inv_data.get("out_of_stock_count", 0)
|
out_of_stock_count = 0
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to fetch inventory: {e}")
|
|
||||||
out_of_stock_count = 0
|
|
||||||
|
|
||||||
# System errors (would come from monitoring system)
|
# System errors (would come from monitoring system)
|
||||||
system_errors = 0
|
system_errors = 0
|
||||||
@@ -267,43 +275,43 @@ async def get_orchestration_summary(
|
|||||||
|
|
||||||
# Enhance with detailed PO and batch summaries
|
# Enhance with detailed PO and batch summaries
|
||||||
if summary["purchaseOrdersCreated"] > 0:
|
if summary["purchaseOrdersCreated"] > 0:
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
try:
|
||||||
try:
|
po_data = await procurement_client.get(
|
||||||
po_response = await client.get(
|
"/purchase-orders",
|
||||||
f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders",
|
tenant_id=tenant_id,
|
||||||
params={"status": "pending_approval", "limit": 10}
|
params={"status": "pending_approval", "limit": 10}
|
||||||
)
|
)
|
||||||
if po_response.status_code == 200:
|
if po_data:
|
||||||
pos = po_response.json().get("items", [])
|
pos = po_data.get("items", [])
|
||||||
summary["purchaseOrdersSummary"] = [
|
summary["purchaseOrdersSummary"] = [
|
||||||
PurchaseOrderSummary(
|
PurchaseOrderSummary(
|
||||||
supplierName=po.get("supplier_name", "Unknown"),
|
supplierName=po.get("supplier_name", "Unknown"),
|
||||||
itemCategories=[item.get("ingredient_name", "Item") for item in po.get("items", [])[:3]],
|
itemCategories=[item.get("ingredient_name", "Item") for item in po.get("items", [])[:3]],
|
||||||
totalAmount=float(po.get("total_amount", 0))
|
totalAmount=float(po.get("total_amount", 0))
|
||||||
)
|
)
|
||||||
for po in pos[:5] # Show top 5
|
for po in pos[:5] # Show top 5
|
||||||
]
|
]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch PO details: {e}")
|
logger.warning(f"Failed to fetch PO details: {e}")
|
||||||
|
|
||||||
if summary["productionBatchesCreated"] > 0:
|
if summary["productionBatchesCreated"] > 0:
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
try:
|
||||||
try:
|
batch_data = await production_client.get(
|
||||||
batch_response = await client.get(
|
"/production-batches/today",
|
||||||
f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches/today"
|
tenant_id=tenant_id
|
||||||
)
|
)
|
||||||
if batch_response.status_code == 200:
|
if batch_data:
|
||||||
batches = batch_response.json().get("batches", [])
|
batches = batch_data.get("batches", [])
|
||||||
summary["productionBatchesSummary"] = [
|
summary["productionBatchesSummary"] = [
|
||||||
ProductionBatchSummary(
|
ProductionBatchSummary(
|
||||||
productName=batch.get("product_name", "Unknown"),
|
productName=batch.get("product_name", "Unknown"),
|
||||||
quantity=batch.get("planned_quantity", 0),
|
quantity=batch.get("planned_quantity", 0),
|
||||||
readyByTime=batch.get("planned_end_time", "")
|
readyByTime=batch.get("planned_end_time", "")
|
||||||
)
|
)
|
||||||
for batch in batches[:5] # Show top 5
|
for batch in batches[:5] # Show top 5
|
||||||
]
|
]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch batch details: {e}")
|
logger.warning(f"Failed to fetch batch details: {e}")
|
||||||
|
|
||||||
return OrchestrationSummaryResponse(**summary)
|
return OrchestrationSummaryResponse(**summary)
|
||||||
|
|
||||||
@@ -327,44 +335,45 @@ async def get_action_queue(
|
|||||||
dashboard_service = DashboardService(db)
|
dashboard_service = DashboardService(db)
|
||||||
|
|
||||||
# Fetch data from various services
|
# Fetch data from various services
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
# Get pending POs
|
||||||
# Get pending POs
|
pending_pos = []
|
||||||
pending_pos = []
|
try:
|
||||||
try:
|
po_data = await procurement_client.get(
|
||||||
po_response = await client.get(
|
"/purchase-orders",
|
||||||
f"http://procurement-service:8000/api/v1/tenants/{tenant_id}/purchase-orders",
|
tenant_id=tenant_id,
|
||||||
params={"status": "pending_approval", "limit": 20}
|
params={"status": "pending_approval", "limit": 20}
|
||||||
)
|
)
|
||||||
if po_response.status_code == 200:
|
if po_data:
|
||||||
pending_pos = po_response.json().get("items", [])
|
pending_pos = po_data.get("items", [])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch pending POs: {e}")
|
logger.warning(f"Failed to fetch pending POs: {e}")
|
||||||
|
|
||||||
# Get critical alerts
|
# Get critical alerts
|
||||||
critical_alerts = []
|
critical_alerts = []
|
||||||
try:
|
try:
|
||||||
alerts_response = await client.get(
|
alerts_data = await procurement_client.get(
|
||||||
f"http://alert-processor-service:8000/api/v1/tenants/{tenant_id}/alerts",
|
"/alert-processor/alerts",
|
||||||
params={"severity": "critical", "resolved": False, "limit": 20}
|
tenant_id=tenant_id,
|
||||||
)
|
params={"severity": "critical", "resolved": False, "limit": 20}
|
||||||
if alerts_response.status_code == 200:
|
)
|
||||||
critical_alerts = alerts_response.json().get("alerts", [])
|
if alerts_data:
|
||||||
except Exception as e:
|
critical_alerts = alerts_data.get("alerts", [])
|
||||||
logger.warning(f"Failed to fetch alerts: {e}")
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to fetch alerts: {e}")
|
||||||
|
|
||||||
# Get onboarding status
|
# Get onboarding status
|
||||||
onboarding_incomplete = False
|
onboarding_incomplete = False
|
||||||
onboarding_steps = []
|
onboarding_steps = []
|
||||||
try:
|
try:
|
||||||
onboarding_response = await client.get(
|
onboarding_data = await procurement_client.get(
|
||||||
f"http://auth:8000/api/v1/tenants/{tenant_id}/onboarding-progress"
|
"/auth/onboarding-progress",
|
||||||
)
|
tenant_id=tenant_id
|
||||||
if onboarding_response.status_code == 200:
|
)
|
||||||
onboarding_data = onboarding_response.json()
|
if onboarding_data:
|
||||||
onboarding_incomplete = not onboarding_data.get("completed", True)
|
onboarding_incomplete = not onboarding_data.get("completed", True)
|
||||||
onboarding_steps = onboarding_data.get("steps", [])
|
onboarding_steps = onboarding_data.get("steps", [])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch onboarding status: {e}")
|
logger.warning(f"Failed to fetch onboarding status: {e}")
|
||||||
|
|
||||||
# Build action queue
|
# Build action queue
|
||||||
actions = await dashboard_service.get_action_queue(
|
actions = await dashboard_service.get_action_queue(
|
||||||
@@ -406,15 +415,15 @@ async def get_production_timeline(
|
|||||||
|
|
||||||
# Fetch today's production batches
|
# Fetch today's production batches
|
||||||
batches = []
|
batches = []
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
try:
|
||||||
try:
|
batch_data = await production_client.get(
|
||||||
response = await client.get(
|
"/production-batches/today",
|
||||||
f"http://production-service:8000/api/v1/tenants/{tenant_id}/production-batches/today"
|
tenant_id=tenant_id
|
||||||
)
|
)
|
||||||
if response.status_code == 200:
|
if batch_data:
|
||||||
batches = response.json().get("batches", [])
|
batches = batch_data.get("batches", [])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch production batches: {e}")
|
logger.warning(f"Failed to fetch production batches: {e}")
|
||||||
|
|
||||||
# Transform to timeline format
|
# Transform to timeline format
|
||||||
timeline = await dashboard_service.get_production_timeline(
|
timeline = await dashboard_service.get_production_timeline(
|
||||||
@@ -454,34 +463,31 @@ async def get_insights(
|
|||||||
dashboard_service = DashboardService(db)
|
dashboard_service = DashboardService(db)
|
||||||
|
|
||||||
# Fetch data from various services
|
# Fetch data from various services
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
# Sustainability data
|
||||||
# Sustainability data
|
sustainability_data = {}
|
||||||
sustainability_data = {}
|
try:
|
||||||
try:
|
sustainability_data = await inventory_client.get(
|
||||||
response = await client.get(
|
"/sustainability/widget",
|
||||||
f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/sustainability/widget"
|
tenant_id=tenant_id
|
||||||
)
|
) or {}
|
||||||
if response.status_code == 200:
|
except Exception as e:
|
||||||
sustainability_data = response.json()
|
logger.warning(f"Failed to fetch sustainability data: {e}")
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to fetch sustainability data: {e}")
|
|
||||||
|
|
||||||
# Inventory data
|
# Inventory data
|
||||||
inventory_data = {}
|
inventory_data = {}
|
||||||
try:
|
try:
|
||||||
response = await client.get(
|
inventory_data = await inventory_client.get(
|
||||||
f"http://inventory-service:8000/api/v1/tenants/{tenant_id}/inventory/dashboard/stock-status"
|
"/inventory/dashboard/stock-status",
|
||||||
)
|
tenant_id=tenant_id
|
||||||
if response.status_code == 200:
|
) or {}
|
||||||
inventory_data = response.json()
|
except Exception as e:
|
||||||
except Exception as e:
|
logger.warning(f"Failed to fetch inventory data: {e}")
|
||||||
logger.warning(f"Failed to fetch inventory data: {e}")
|
|
||||||
|
|
||||||
# Savings data (mock for now)
|
# Savings data (mock for now)
|
||||||
savings_data = {
|
savings_data = {
|
||||||
"weekly_savings": 124,
|
"weekly_savings": 124,
|
||||||
"trend_percentage": 12
|
"trend_percentage": 12
|
||||||
}
|
}
|
||||||
|
|
||||||
# Calculate insights
|
# Calculate insights
|
||||||
insights = await dashboard_service.calculate_insights(
|
insights = await dashboard_service.calculate_insights(
|
||||||
|
|||||||
Reference in New Issue
Block a user