# shared/clients/production_client.py """ Production Service Client for Inter-Service Communication Provides access to production planning and batch management from other services """ import structlog from typing import Dict, Any, Optional, List from uuid import UUID from shared.clients.base_service_client import BaseServiceClient from shared.config.base import BaseServiceSettings logger = structlog.get_logger() class ProductionServiceClient(BaseServiceClient): """Client for communicating with the Production Service""" def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"): super().__init__(calling_service_name, config) def get_service_base_path(self) -> str: return "/api/v1" # ================================================================ # PRODUCTION PLANNING # ================================================================ async def generate_schedule( self, tenant_id: str, forecast_data: Dict[str, Any], inventory_data: Optional[Dict[str, Any]] = None, recipes_data: Optional[Dict[str, Any]] = None, target_date: Optional[str] = None, planning_horizon_days: int = 1 ) -> Optional[Dict[str, Any]]: """ Generate production schedule (called by Orchestrator). Args: tenant_id: Tenant ID forecast_data: Forecast data from forecasting service inventory_data: Optional inventory snapshot (NEW - to avoid duplicate fetching) recipes_data: Optional recipes snapshot (NEW - to avoid duplicate fetching) target_date: Optional target date planning_horizon_days: Number of days to plan Returns: Dict with schedule_id, batches_created, etc. """ try: request_data = { "forecast_data": forecast_data, "target_date": target_date, "planning_horizon_days": planning_horizon_days } # NEW: Include cached data if provided if inventory_data: request_data["inventory_data"] = inventory_data if recipes_data: request_data["recipes_data"] = recipes_data result = await self.post( "production/operations/generate-schedule", data=request_data, tenant_id=tenant_id ) if result: logger.info( "Generated production schedule", schedule_id=result.get('schedule_id'), batches_created=result.get('batches_created', 0), tenant_id=tenant_id ) return result except Exception as e: logger.error( "Error generating production schedule", error=str(e), tenant_id=tenant_id ) return None async def get_production_requirements(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get production requirements for procurement planning""" try: params = {} if date: params["date"] = date result = await self.get("production/requirements", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production requirements from production service", date=date, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production requirements", error=str(e), tenant_id=tenant_id) return None async def get_daily_requirements(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get daily production requirements""" try: params = {} if date: params["date"] = date result = await self.get("production/daily-requirements", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved daily production requirements from production service", date=date, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting daily production requirements", error=str(e), tenant_id=tenant_id) return None async def get_production_schedule(self, tenant_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get production schedule for a date range""" try: params = {} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date result = await self.get("production/schedules", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production schedule from production service", start_date=start_date, end_date=end_date, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production schedule", error=str(e), tenant_id=tenant_id) return None # ================================================================ # BATCH MANAGEMENT # ================================================================ async def get_active_batches(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: """Get currently active production batches""" try: result = await self.get("production/batches/active", tenant_id=tenant_id) batches = result.get('batches', []) if result else [] logger.info("Retrieved active production batches from production service", batches_count=len(batches), tenant_id=tenant_id) return batches except Exception as e: logger.error("Error getting active production batches", error=str(e), tenant_id=tenant_id) return [] async def create_production_batch(self, tenant_id: str, batch_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create a new production batch""" try: result = await self.post("production/batches", data=batch_data, tenant_id=tenant_id) if result: logger.info("Created production batch", batch_id=result.get('id'), product_id=batch_data.get('product_id'), tenant_id=tenant_id) return result except Exception as e: logger.error("Error creating production batch", error=str(e), tenant_id=tenant_id) return None async def update_batch_status(self, tenant_id: str, batch_id: str, status: str, actual_quantity: Optional[float] = None) -> Optional[Dict[str, Any]]: """Update production batch status""" try: data = {"status": status} if actual_quantity is not None: data["actual_quantity"] = actual_quantity result = await self.put(f"production/batches/{batch_id}/status", data=data, tenant_id=tenant_id) if result: logger.info("Updated production batch status", batch_id=batch_id, status=status, tenant_id=tenant_id) return result except Exception as e: logger.error("Error updating production batch status", error=str(e), batch_id=batch_id, tenant_id=tenant_id) return None async def get_batch_details(self, tenant_id: str, batch_id: str) -> Optional[Dict[str, Any]]: """Get detailed information about a production batch""" try: result = await self.get(f"production/batches/{batch_id}", tenant_id=tenant_id) if result: logger.info("Retrieved production batch details", batch_id=batch_id, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production batch details", error=str(e), batch_id=batch_id, tenant_id=tenant_id) return None # ================================================================ # CAPACITY MANAGEMENT # ================================================================ async def get_capacity_status(self, tenant_id: str, date: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get production capacity status for a specific date""" try: params = {} if date: params["date"] = date result = await self.get("production/capacity/status", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production capacity status", date=date, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production capacity status", error=str(e), tenant_id=tenant_id) return None async def check_capacity_availability(self, tenant_id: str, requirements: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Check if production capacity is available for requirements""" try: result = await self.post("production/capacity/check-availability", {"requirements": requirements}, tenant_id=tenant_id) if result: logger.info("Checked production capacity availability", requirements_count=len(requirements), tenant_id=tenant_id) return result except Exception as e: logger.error("Error checking production capacity availability", error=str(e), tenant_id=tenant_id) return None # ================================================================ # QUALITY CONTROL # ================================================================ async def record_quality_check(self, tenant_id: str, batch_id: str, quality_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Record quality control results for a batch""" try: result = await self.post(f"production/batches/{batch_id}/quality-check", data=quality_data, tenant_id=tenant_id) if result: logger.info("Recorded quality check for production batch", batch_id=batch_id, tenant_id=tenant_id) return result except Exception as e: logger.error("Error recording quality check", error=str(e), batch_id=batch_id, tenant_id=tenant_id) return None async def get_yield_metrics(self, tenant_id: str, start_date: str, end_date: str) -> Optional[Dict[str, Any]]: """Get production yield metrics for analysis""" try: params = { "start_date": start_date, "end_date": end_date } result = await self.get("production/analytics/yield-metrics", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production yield metrics", start_date=start_date, end_date=end_date, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production yield metrics", error=str(e), tenant_id=tenant_id) return None # ================================================================ # DASHBOARD AND ANALYTICS # ================================================================ async def get_dashboard_summary(self, tenant_id: str) -> Optional[Dict[str, Any]]: """Get production dashboard summary data""" try: result = await self.get("production/dashboard/summary", tenant_id=tenant_id) if result: logger.info("Retrieved production dashboard summary", tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production dashboard summary", error=str(e), tenant_id=tenant_id) return None async def get_efficiency_metrics(self, tenant_id: str, period: str = "last_30_days") -> Optional[Dict[str, Any]]: """Get production efficiency metrics""" try: params = {"period": period} result = await self.get("production/analytics/efficiency", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production efficiency metrics", period=period, tenant_id=tenant_id) return result except Exception as e: logger.error("Error getting production efficiency metrics", error=str(e), tenant_id=tenant_id) return None # ================================================================ # ALERTS AND NOTIFICATIONS # ================================================================ async def get_production_alerts(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: """Get production-related alerts""" try: result = await self.get("production/alerts", tenant_id=tenant_id) alerts = result.get('alerts', []) if result else [] logger.info("Retrieved production alerts", alerts_count=len(alerts), tenant_id=tenant_id) return alerts except Exception as e: logger.error("Error getting production alerts", error=str(e), tenant_id=tenant_id) return [] async def acknowledge_alert(self, tenant_id: str, alert_id: str) -> Optional[Dict[str, Any]]: """Acknowledge a production-related alert""" try: result = await self.post(f"production/alerts/{alert_id}/acknowledge", data={}, tenant_id=tenant_id) if result: logger.info("Acknowledged production alert", alert_id=alert_id, tenant_id=tenant_id) return result except Exception as e: logger.error("Error acknowledging production alert", error=str(e), alert_id=alert_id, tenant_id=tenant_id) return None # ================================================================ # WASTE AND SUSTAINABILITY ANALYTICS # ================================================================ async def get_waste_analytics( self, tenant_id: str, start_date: str, end_date: str ) -> Optional[Dict[str, Any]]: """ Get production waste analytics for sustainability reporting Args: tenant_id: Tenant ID start_date: Start date (ISO format) end_date: End date (ISO format) Returns: Dictionary with waste analytics data: - total_production_waste: Total waste in kg - total_defects: Total defect waste in kg - total_planned: Total planned production in kg - total_actual: Total actual production in kg - ai_assisted_batches: Number of AI-assisted batches """ try: params = { "start_date": start_date, "end_date": end_date } result = await self.get("production/waste-analytics", tenant_id=tenant_id, params=params) if result: logger.info("Retrieved production waste analytics", tenant_id=tenant_id, start_date=start_date, end_date=end_date) return result except Exception as e: logger.error("Error getting production waste analytics", error=str(e), tenant_id=tenant_id) return None async def get_baseline(self, tenant_id: str) -> Optional[Dict[str, Any]]: """ Get baseline waste percentage for SDG compliance calculations Args: tenant_id: Tenant ID Returns: Dictionary with baseline data: - waste_percentage: Baseline waste percentage - period: Information about the baseline period - data_available: Whether real data is available - total_production_kg: Total production during baseline - total_waste_kg: Total waste during baseline """ try: result = await self.get("production/baseline", tenant_id=tenant_id) if result: logger.info("Retrieved production baseline data", tenant_id=tenant_id, data_available=result.get('data_available', False)) return result except Exception as e: logger.error("Error getting production baseline", error=str(e), tenant_id=tenant_id) return None # ================================================================ # ML INSIGHTS: Yield Prediction # ================================================================ async def trigger_yield_prediction( self, tenant_id: str, recipe_ids: Optional[List[str]] = None, lookback_days: int = 90, min_history_runs: int = 30 ) -> Optional[Dict[str, Any]]: """ Trigger yield prediction for production recipes. Args: tenant_id: Tenant UUID recipe_ids: Specific recipe IDs to analyze. If None, analyzes all recipes lookback_days: Days of historical production to analyze (30-365) min_history_runs: Minimum production runs required (10-100) Returns: Dict with prediction results including insights posted """ try: data = { "recipe_ids": recipe_ids, "lookback_days": lookback_days, "min_history_runs": min_history_runs } result = await self.post("production/ml/insights/predict-yields", data=data, tenant_id=tenant_id) if result: logger.info("Triggered yield prediction", recipes_analyzed=result.get('recipes_analyzed', 0), insights_posted=result.get('total_insights_posted', 0), tenant_id=tenant_id) return result except Exception as e: logger.error("Error triggering yield prediction", error=str(e), tenant_id=tenant_id) return None # ================================================================ # DASHBOARD METHODS # ================================================================ async def get_production_summary_batch( self, tenant_ids: List[str] ) -> Dict[str, Any]: """ Get production summaries for multiple tenants in a single request. Phase 2 optimization: Eliminates N+1 query patterns for enterprise dashboards. Args: tenant_ids: List of tenant IDs to fetch Returns: Dict mapping tenant_id -> production summary """ try: if not tenant_ids: return {} if len(tenant_ids) > 100: logger.warning("Batch request exceeds max tenant limit", requested=len(tenant_ids)) tenant_ids = tenant_ids[:100] result = await self.post( "production/batch/production-summary", data={"tenant_ids": tenant_ids}, tenant_id=tenant_ids[0] # Use first tenant for auth context ) summaries = result if isinstance(result, dict) else {} logger.info( "Batch retrieved production summaries", requested=len(tenant_ids), found=len(summaries) ) return summaries except Exception as e: logger.error( "Error batch fetching production summaries", error=str(e), tenant_count=len(tenant_ids) ) return {} async def get_todays_batches( self, tenant_id: str ) -> Optional[Dict[str, Any]]: """ Get today's production batches for dashboard timeline For demo compatibility: Queries all recent batches and filters for actionable ones scheduled for today, since demo session dates are adjusted relative to session creation time. Args: tenant_id: Tenant ID Returns: Dict with ProductionBatchListResponse: {"batches": [...], "total_count": n, "page": 1, "page_size": n} """ try: from datetime import datetime, timezone, timedelta # Get today's date range (start of day to end of day in UTC) now = datetime.now(timezone.utc) today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) today_end = today_start + timedelta(days=1) # Query all batches without date/status filter for demo compatibility # The dashboard will filter for PENDING, IN_PROGRESS, or SCHEDULED result = await self.get( "/production/batches", tenant_id=tenant_id, params={"page_size": 100} ) if result and "batches" in result: # Filter for actionable batches scheduled for TODAY actionable_statuses = {"PENDING", "IN_PROGRESS", "SCHEDULED"} filtered_batches = [] for batch in result["batches"]: # Check if batch is actionable if batch.get("status") not in actionable_statuses: continue # Check if batch is scheduled for today # Include batches that START today OR END today (for overnight batches) planned_start = batch.get("planned_start_time") planned_end = batch.get("planned_end_time") include_batch = False if planned_start: # Parse the start date string if isinstance(planned_start, str): planned_start = datetime.fromisoformat(planned_start.replace('Z', '+00:00')) # Include if batch starts today if today_start <= planned_start < today_end: include_batch = True # Also check if batch ends today (for overnight batches) if not include_batch and planned_end: if isinstance(planned_end, str): planned_end = datetime.fromisoformat(planned_end.replace('Z', '+00:00')) # Include if batch ends today (even if it started yesterday) if today_start <= planned_end < today_end: include_batch = True if include_batch: filtered_batches.append(batch) # Return filtered result return { **result, "batches": filtered_batches, "total_count": len(filtered_batches) } return result except Exception as e: logger.error("Error fetching today's batches", error=str(e), tenant_id=tenant_id) return None async def get_production_batches_by_status( self, tenant_id: str, status: str, limit: int = 100 ) -> Optional[Dict[str, Any]]: """ Get production batches filtered by status for dashboard Args: tenant_id: Tenant ID status: Batch status (e.g., "ON_HOLD", "IN_PROGRESS") limit: Maximum number of batches to return Returns: Dict with ProductionBatchListResponse: {"batches": [...], "total_count": n, "page": 1, "page_size": n} """ try: return await self.get( "/production/batches", tenant_id=tenant_id, params={"status": status, "page_size": limit} ) except Exception as e: logger.error("Error fetching production batches", error=str(e), status=status, tenant_id=tenant_id) return None # ================================================================ # UTILITY METHODS # ================================================================ async def health_check(self) -> bool: """Check if production service is healthy""" try: result = await self.get("../health") # Health endpoint is not tenant-scoped return result is not None except Exception as e: logger.error("Production service health check failed", error=str(e)) return False # ================================================================ # INTERNAL TRIGGER METHODS # ================================================================ async def trigger_production_alerts_internal( self, tenant_id: str ) -> Optional[Dict[str, Any]]: """ Trigger production alerts for a tenant (internal service use only). This method calls the internal endpoint which is protected by x-internal-service header. Includes both production alerts and equipment maintenance checks. Args: tenant_id: Tenant ID to trigger alerts for Returns: Dict with trigger results or None if failed """ try: # Call internal endpoint via gateway using tenant-scoped URL pattern # Endpoint: /api/v1/tenants/{tenant_id}/production/internal/alerts/trigger result = await self._make_request( method="POST", endpoint="production/internal/alerts/trigger", tenant_id=tenant_id, data={}, headers={"x-internal-service": "demo-session"} ) if result: logger.info( "Production alerts triggered successfully via internal endpoint", tenant_id=tenant_id, alerts_generated=result.get("alerts_generated", 0) ) else: logger.warning( "Production alerts internal endpoint returned no result", tenant_id=tenant_id ) return result except Exception as e: logger.error( "Error triggering production alerts via internal endpoint", tenant_id=tenant_id, error=str(e) ) return None # ================================================================ # INTERNAL AI INSIGHTS METHODS # ================================================================ async def trigger_yield_insights_internal( self, tenant_id: str ) -> Optional[Dict[str, Any]]: """ Trigger yield improvement insights for a tenant (internal service use only). This method calls the internal endpoint which is protected by x-internal-service header. Args: tenant_id: Tenant ID to trigger insights for Returns: Dict with trigger results or None if failed """ try: result = await self._make_request( method="POST", endpoint="production/internal/ml/generate-yield-insights", tenant_id=tenant_id, data={"tenant_id": tenant_id}, headers={"x-internal-service": "demo-session"} ) if result: logger.info( "Yield insights triggered successfully via internal endpoint", tenant_id=tenant_id, insights_posted=result.get("insights_posted", 0) ) else: logger.warning( "Yield insights internal endpoint returned no result", tenant_id=tenant_id ) return result except Exception as e: logger.error( "Error triggering yield insights via internal endpoint", tenant_id=tenant_id, error=str(e) ) return None # Factory function for dependency injection def create_production_client(config: BaseServiceSettings) -> ProductionServiceClient: """Create production service client instance""" return ProductionServiceClient(config)