Created typed, domain-specific methods in service clients instead of using generic .get() calls with paths. This improves type safety, discoverability, and maintainability. Service Client Changes: - ProcurementServiceClient: * get_pending_purchase_orders() - POs awaiting approval * get_critical_alerts() - Critical severity alerts * get_alerts_summary() - Alert counts by severity - ProductionServiceClient: * get_todays_batches() - Today's production timeline * get_production_batches_by_status() - Filter by status - InventoryServiceClient: * get_stock_status() - Dashboard stock metrics * get_sustainability_widget() - Sustainability data Dashboard API Changes: - Updated all endpoints to use new dedicated methods - Cleaner, more maintainable code - Better error handling and logging - Fixed inventory data type handling (list vs dict) Note: Alert endpoints return 404 - alert_processor service needs endpoints: /alerts/summary and /alerts (filtered by severity).
520 lines
21 KiB
Python
520 lines
21 KiB
Python
# shared/clients/production_client.py
|
|
"""
|
|
Production Service Client for Inter-Service Communication
|
|
Provides access to production planning and batch management from other services
|
|
"""
|
|
|
|
import structlog
|
|
from typing import Dict, Any, Optional, List
|
|
from uuid import UUID
|
|
from shared.clients.base_service_client import BaseServiceClient
|
|
from shared.config.base import BaseServiceSettings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class ProductionServiceClient(BaseServiceClient):
|
|
"""Client for communicating with the Production Service"""
|
|
|
|
def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"):
|
|
super().__init__(calling_service_name, config)
|
|
|
|
def get_service_base_path(self) -> str:
|
|
return "/api/v1"
|
|
|
|
# ================================================================
|
|
# PRODUCTION PLANNING
|
|
# ================================================================
|
|
|
|
async def generate_schedule(
|
|
self,
|
|
tenant_id: str,
|
|
forecast_data: Dict[str, Any],
|
|
inventory_data: Optional[Dict[str, Any]] = None,
|
|
recipes_data: Optional[Dict[str, Any]] = None,
|
|
target_date: Optional[str] = None,
|
|
planning_horizon_days: int = 1
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Generate production schedule (called by Orchestrator).
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
forecast_data: Forecast data from forecasting service
|
|
inventory_data: Optional inventory snapshot (NEW - to avoid duplicate fetching)
|
|
recipes_data: Optional recipes snapshot (NEW - to avoid duplicate fetching)
|
|
target_date: Optional target date
|
|
planning_horizon_days: Number of days to plan
|
|
|
|
Returns:
|
|
Dict with schedule_id, batches_created, etc.
|
|
"""
|
|
try:
|
|
request_data = {
|
|
"forecast_data": forecast_data,
|
|
"target_date": target_date,
|
|
"planning_horizon_days": planning_horizon_days
|
|
}
|
|
|
|
# NEW: Include cached data if provided
|
|
if inventory_data:
|
|
request_data["inventory_data"] = inventory_data
|
|
if recipes_data:
|
|
request_data["recipes_data"] = recipes_data
|
|
|
|
result = await self.post(
|
|
"production/operations/generate-schedule",
|
|
data=request_data,
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
if result:
|
|
logger.info(
|
|
"Generated production schedule",
|
|
schedule_id=result.get('schedule_id'),
|
|
batches_created=result.get('batches_created', 0),
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error generating production schedule",
|
|
error=str(e),
|
|
tenant_id=tenant_id
|
|
)
|
|
return None
|
|
|
|
async def get_production_requirements(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Get production requirements for procurement planning"""
|
|
try:
|
|
params = {}
|
|
if date:
|
|
params["date"] = date
|
|
|
|
result = await self.get("production/requirements", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production requirements from production service",
|
|
date=date, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production requirements",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_daily_requirements(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Get daily production requirements"""
|
|
try:
|
|
params = {}
|
|
if date:
|
|
params["date"] = date
|
|
|
|
result = await self.get("production/daily-requirements", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved daily production requirements from production service",
|
|
date=date, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting daily production requirements",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_production_schedule(self, tenant_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Get production schedule for a date range"""
|
|
try:
|
|
params = {}
|
|
if start_date:
|
|
params["start_date"] = start_date
|
|
if end_date:
|
|
params["end_date"] = end_date
|
|
|
|
result = await self.get("production/schedules", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production schedule from production service",
|
|
start_date=start_date, end_date=end_date, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production schedule",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# BATCH MANAGEMENT
|
|
# ================================================================
|
|
|
|
async def get_active_batches(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Get currently active production batches"""
|
|
try:
|
|
result = await self.get("production/batches/active", tenant_id=tenant_id)
|
|
batches = result.get('batches', []) if result else []
|
|
logger.info("Retrieved active production batches from production service",
|
|
batches_count=len(batches), tenant_id=tenant_id)
|
|
return batches
|
|
except Exception as e:
|
|
logger.error("Error getting active production batches",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return []
|
|
|
|
async def create_production_batch(self, tenant_id: str, batch_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Create a new production batch"""
|
|
try:
|
|
result = await self.post("production/batches", data=batch_data, tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Created production batch",
|
|
batch_id=result.get('id'),
|
|
product_id=batch_data.get('product_id'),
|
|
tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error creating production batch",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def update_batch_status(self, tenant_id: str, batch_id: str, status: str, actual_quantity: Optional[float] = None) -> Optional[Dict[str, Any]]:
|
|
"""Update production batch status"""
|
|
try:
|
|
data = {"status": status}
|
|
if actual_quantity is not None:
|
|
data["actual_quantity"] = actual_quantity
|
|
|
|
result = await self.put(f"production/batches/{batch_id}/status", data=data, tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Updated production batch status",
|
|
batch_id=batch_id, status=status, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error updating production batch status",
|
|
error=str(e), batch_id=batch_id, tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_batch_details(self, tenant_id: str, batch_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get detailed information about a production batch"""
|
|
try:
|
|
result = await self.get(f"production/batches/{batch_id}", tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Retrieved production batch details",
|
|
batch_id=batch_id, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production batch details",
|
|
error=str(e), batch_id=batch_id, tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# CAPACITY MANAGEMENT
|
|
# ================================================================
|
|
|
|
async def get_capacity_status(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Get production capacity status for a specific date"""
|
|
try:
|
|
params = {}
|
|
if date:
|
|
params["date"] = date
|
|
|
|
result = await self.get("production/capacity/status", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production capacity status",
|
|
date=date, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production capacity status",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def check_capacity_availability(self, tenant_id: str, requirements: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
|
"""Check if production capacity is available for requirements"""
|
|
try:
|
|
result = await self.post("production/capacity/check-availability",
|
|
{"requirements": requirements},
|
|
tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Checked production capacity availability",
|
|
requirements_count=len(requirements), tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error checking production capacity availability",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# QUALITY CONTROL
|
|
# ================================================================
|
|
|
|
async def record_quality_check(self, tenant_id: str, batch_id: str, quality_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Record quality control results for a batch"""
|
|
try:
|
|
result = await self.post(f"production/batches/{batch_id}/quality-check",
|
|
data=quality_data,
|
|
tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Recorded quality check for production batch",
|
|
batch_id=batch_id, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error recording quality check",
|
|
error=str(e), batch_id=batch_id, tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_yield_metrics(self, tenant_id: str, start_date: str, end_date: str) -> Optional[Dict[str, Any]]:
|
|
"""Get production yield metrics for analysis"""
|
|
try:
|
|
params = {
|
|
"start_date": start_date,
|
|
"end_date": end_date
|
|
}
|
|
result = await self.get("production/analytics/yield-metrics", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production yield metrics",
|
|
start_date=start_date, end_date=end_date, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production yield metrics",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# DASHBOARD AND ANALYTICS
|
|
# ================================================================
|
|
|
|
async def get_dashboard_summary(self, tenant_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get production dashboard summary data"""
|
|
try:
|
|
result = await self.get("production/dashboard/summary", tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Retrieved production dashboard summary",
|
|
tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production dashboard summary",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_efficiency_metrics(self, tenant_id: str, period: str = "last_30_days") -> Optional[Dict[str, Any]]:
|
|
"""Get production efficiency metrics"""
|
|
try:
|
|
params = {"period": period}
|
|
result = await self.get("production/analytics/efficiency", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production efficiency metrics",
|
|
period=period, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production efficiency metrics",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# ALERTS AND NOTIFICATIONS
|
|
# ================================================================
|
|
|
|
async def get_production_alerts(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Get production-related alerts"""
|
|
try:
|
|
result = await self.get("production/alerts", tenant_id=tenant_id)
|
|
alerts = result.get('alerts', []) if result else []
|
|
logger.info("Retrieved production alerts",
|
|
alerts_count=len(alerts), tenant_id=tenant_id)
|
|
return alerts
|
|
except Exception as e:
|
|
logger.error("Error getting production alerts",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return []
|
|
|
|
async def acknowledge_alert(self, tenant_id: str, alert_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Acknowledge a production-related alert"""
|
|
try:
|
|
result = await self.post(f"production/alerts/{alert_id}/acknowledge", data={}, tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Acknowledged production alert",
|
|
alert_id=alert_id, tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error acknowledging production alert",
|
|
error=str(e), alert_id=alert_id, tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# WASTE AND SUSTAINABILITY ANALYTICS
|
|
# ================================================================
|
|
|
|
async def get_waste_analytics(
|
|
self,
|
|
tenant_id: str,
|
|
start_date: str,
|
|
end_date: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get production waste analytics for sustainability reporting
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
start_date: Start date (ISO format)
|
|
end_date: End date (ISO format)
|
|
|
|
Returns:
|
|
Dictionary with waste analytics data:
|
|
- total_production_waste: Total waste in kg
|
|
- total_defects: Total defect waste in kg
|
|
- total_planned: Total planned production in kg
|
|
- total_actual: Total actual production in kg
|
|
- ai_assisted_batches: Number of AI-assisted batches
|
|
"""
|
|
try:
|
|
params = {
|
|
"start_date": start_date,
|
|
"end_date": end_date
|
|
}
|
|
result = await self.get("production/waste-analytics", tenant_id=tenant_id, params=params)
|
|
if result:
|
|
logger.info("Retrieved production waste analytics",
|
|
tenant_id=tenant_id,
|
|
start_date=start_date,
|
|
end_date=end_date)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production waste analytics",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_baseline(self, tenant_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get baseline waste percentage for SDG compliance calculations
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
|
|
Returns:
|
|
Dictionary with baseline data:
|
|
- waste_percentage: Baseline waste percentage
|
|
- period: Information about the baseline period
|
|
- data_available: Whether real data is available
|
|
- total_production_kg: Total production during baseline
|
|
- total_waste_kg: Total waste during baseline
|
|
"""
|
|
try:
|
|
result = await self.get("production/baseline", tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Retrieved production baseline data",
|
|
tenant_id=tenant_id,
|
|
data_available=result.get('data_available', False))
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error getting production baseline",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# ML INSIGHTS: Yield Prediction
|
|
# ================================================================
|
|
|
|
async def trigger_yield_prediction(
|
|
self,
|
|
tenant_id: str,
|
|
recipe_ids: Optional[List[str]] = None,
|
|
lookback_days: int = 90,
|
|
min_history_runs: int = 30
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Trigger yield prediction for production recipes.
|
|
|
|
Args:
|
|
tenant_id: Tenant UUID
|
|
recipe_ids: Specific recipe IDs to analyze. If None, analyzes all recipes
|
|
lookback_days: Days of historical production to analyze (30-365)
|
|
min_history_runs: Minimum production runs required (10-100)
|
|
|
|
Returns:
|
|
Dict with prediction results including insights posted
|
|
"""
|
|
try:
|
|
data = {
|
|
"recipe_ids": recipe_ids,
|
|
"lookback_days": lookback_days,
|
|
"min_history_runs": min_history_runs
|
|
}
|
|
result = await self.post("production/ml/insights/predict-yields", data=data, tenant_id=tenant_id)
|
|
if result:
|
|
logger.info("Triggered yield prediction",
|
|
recipes_analyzed=result.get('recipes_analyzed', 0),
|
|
insights_posted=result.get('total_insights_posted', 0),
|
|
tenant_id=tenant_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error("Error triggering yield prediction",
|
|
error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# DASHBOARD METHODS
|
|
# ================================================================
|
|
|
|
async def get_todays_batches(
|
|
self,
|
|
tenant_id: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get today's production batches for dashboard timeline
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
|
|
Returns:
|
|
Dict with {"batches": [...], "summary": {...}}
|
|
"""
|
|
try:
|
|
return await self.get(
|
|
"/production/production-batches/today",
|
|
tenant_id=tenant_id
|
|
)
|
|
except Exception as e:
|
|
logger.error("Error fetching today's batches", error=str(e), tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_production_batches_by_status(
|
|
self,
|
|
tenant_id: str,
|
|
status: str,
|
|
limit: int = 100
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get production batches filtered by status for dashboard
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
status: Batch status (e.g., "ON_HOLD", "IN_PROGRESS")
|
|
limit: Maximum number of batches to return
|
|
|
|
Returns:
|
|
Dict with {"items": [...], "total": n}
|
|
"""
|
|
try:
|
|
return await self.get(
|
|
"/production/production-batches",
|
|
tenant_id=tenant_id,
|
|
params={"status": status, "limit": limit}
|
|
)
|
|
except Exception as e:
|
|
logger.error("Error fetching production batches", error=str(e),
|
|
status=status, tenant_id=tenant_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# UTILITY METHODS
|
|
# ================================================================
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if production service is healthy"""
|
|
try:
|
|
result = await self.get("../health") # Health endpoint is not tenant-scoped
|
|
return result is not None
|
|
except Exception as e:
|
|
logger.error("Production service health check failed", error=str(e))
|
|
return False
|
|
|
|
|
|
# Factory function for dependency injection
|
|
def create_production_client(config: BaseServiceSettings) -> ProductionServiceClient:
|
|
"""Create production service client instance"""
|
|
return ProductionServiceClient(config)
|