Files
bakery-ia/services/alert_processor/app/enrichment/user_agency.py
2025-12-05 20:07:01 +01:00

117 lines
3.8 KiB
Python

"""
User agency analyzer for alerts.
Determines whether user can fix the issue, what blockers exist,
and if external parties are required.
"""
from typing import Dict, Any
import structlog
logger = structlog.get_logger()
class UserAgencyAnalyzer:
"""Analyze user's ability to act on alerts"""
def analyze(
self,
event_type: str,
metadata: Dict[str, Any],
orchestrator_context: dict
) -> dict:
"""
Analyze user agency for an event.
Returns dict with:
- can_user_fix: Boolean - can user resolve this?
- requires_external_party: Boolean
- external_party_name: Name of required party
- external_party_contact: Contact info
- blockers: List of blocking factors
- suggested_workaround: Optional workaround suggestion
"""
agency = {
"can_user_fix": True,
"requires_external_party": False,
"external_party_name": None,
"external_party_contact": None,
"blockers": [],
"suggested_workaround": None
}
# If orchestrator already addressed it, user agency is low
if orchestrator_context and orchestrator_context.get("already_addressed"):
agency["can_user_fix"] = False
agency["blockers"].append("ai_already_handled")
return agency
# Analyze based on event type
if "po_approval" in event_type:
agency["can_user_fix"] = True
elif "delivery" in event_type or "supplier" in event_type:
agency.update(self._analyze_supplier_agency(metadata))
elif "equipment" in event_type:
agency.update(self._analyze_equipment_agency(metadata))
elif "stock" in event_type:
agency.update(self._analyze_stock_agency(metadata, orchestrator_context))
return agency
def _analyze_supplier_agency(self, metadata: Dict[str, Any]) -> dict:
"""Analyze agency for supplier-related alerts"""
agency = {
"requires_external_party": True,
"external_party_name": metadata.get("supplier_name"),
"external_party_contact": metadata.get("supplier_contact")
}
# User can contact supplier but can't directly fix
if not metadata.get("supplier_contact"):
agency["blockers"].append("no_supplier_contact")
return agency
def _analyze_equipment_agency(self, metadata: Dict[str, Any]) -> dict:
"""Analyze agency for equipment-related alerts"""
agency = {}
equipment_type = metadata.get("equipment_type", "")
if "oven" in equipment_type.lower() or "mixer" in equipment_type.lower():
agency["requires_external_party"] = True
agency["external_party_name"] = "Maintenance Team"
agency["blockers"].append("requires_technician")
return agency
def _analyze_stock_agency(
self,
metadata: Dict[str, Any],
orchestrator_context: dict
) -> dict:
"""Analyze agency for stock-related alerts"""
agency = {}
# If PO exists, user just needs to approve
if metadata.get("po_id"):
if metadata.get("po_status") == "pending_approval":
agency["can_user_fix"] = True
agency["suggested_workaround"] = "Approve pending PO"
else:
agency["blockers"].append("waiting_for_delivery")
agency["requires_external_party"] = True
agency["external_party_name"] = metadata.get("supplier_name")
# If no PO, user needs to create one
elif metadata.get("supplier_name"):
agency["can_user_fix"] = True
agency["requires_external_party"] = True
agency["external_party_name"] = metadata.get("supplier_name")
return agency