""" User agency analyzer for alerts. Determines whether user can fix the issue, what blockers exist, and if external parties are required. """ from typing import Dict, Any import structlog logger = structlog.get_logger() class UserAgencyAnalyzer: """Analyze user's ability to act on alerts""" def analyze( self, event_type: str, metadata: Dict[str, Any], orchestrator_context: dict ) -> dict: """ Analyze user agency for an event. Returns dict with: - can_user_fix: Boolean - can user resolve this? - requires_external_party: Boolean - external_party_name: Name of required party - external_party_contact: Contact info - blockers: List of blocking factors - suggested_workaround: Optional workaround suggestion """ agency = { "can_user_fix": True, "requires_external_party": False, "external_party_name": None, "external_party_contact": None, "blockers": [], "suggested_workaround": None } # If orchestrator already addressed it, user agency is low if orchestrator_context and orchestrator_context.get("already_addressed"): agency["can_user_fix"] = False agency["blockers"].append("ai_already_handled") return agency # Analyze based on event type if "po_approval" in event_type: agency["can_user_fix"] = True elif "delivery" in event_type or "supplier" in event_type: agency.update(self._analyze_supplier_agency(metadata)) elif "equipment" in event_type: agency.update(self._analyze_equipment_agency(metadata)) elif "stock" in event_type: agency.update(self._analyze_stock_agency(metadata, orchestrator_context)) return agency def _analyze_supplier_agency(self, metadata: Dict[str, Any]) -> dict: """Analyze agency for supplier-related alerts""" agency = { "requires_external_party": True, "external_party_name": metadata.get("supplier_name"), "external_party_contact": metadata.get("supplier_contact") } # User can contact supplier but can't directly fix if not metadata.get("supplier_contact"): agency["blockers"].append("no_supplier_contact") return agency def _analyze_equipment_agency(self, metadata: Dict[str, Any]) -> dict: """Analyze agency for equipment-related alerts""" agency = {} equipment_type = metadata.get("equipment_type", "") if "oven" in equipment_type.lower() or "mixer" in equipment_type.lower(): agency["requires_external_party"] = True agency["external_party_name"] = "Maintenance Team" agency["blockers"].append("requires_technician") return agency def _analyze_stock_agency( self, metadata: Dict[str, Any], orchestrator_context: dict ) -> dict: """Analyze agency for stock-related alerts""" agency = {} # If PO exists, user just needs to approve if metadata.get("po_id"): if metadata.get("po_status") == "pending_approval": agency["can_user_fix"] = True agency["suggested_workaround"] = "Approve pending PO" else: agency["blockers"].append("waiting_for_delivery") agency["requires_external_party"] = True agency["external_party_name"] = metadata.get("supplier_name") # If no PO, user needs to create one elif metadata.get("supplier_name"): agency["can_user_fix"] = True agency["requires_external_party"] = True agency["external_party_name"] = metadata.get("supplier_name") return agency