Files
bakery-ia/services/alert_processor/app/enrichment/user_agency.py

117 lines
3.8 KiB
Python
Raw Normal View History

2025-12-05 20:07:01 +01:00
"""
User agency analyzer for alerts.
Determines whether user can fix the issue, what blockers exist,
and if external parties are required.
"""
from typing import Dict, Any
import structlog
logger = structlog.get_logger()
class UserAgencyAnalyzer:
"""Analyze user's ability to act on alerts"""
def analyze(
self,
event_type: str,
metadata: Dict[str, Any],
orchestrator_context: dict
) -> dict:
"""
Analyze user agency for an event.
Returns dict with:
- can_user_fix: Boolean - can user resolve this?
- requires_external_party: Boolean
- external_party_name: Name of required party
- external_party_contact: Contact info
- blockers: List of blocking factors
- suggested_workaround: Optional workaround suggestion
"""
agency = {
"can_user_fix": True,
"requires_external_party": False,
"external_party_name": None,
"external_party_contact": None,
"blockers": [],
"suggested_workaround": None
}
# If orchestrator already addressed it, user agency is low
if orchestrator_context and orchestrator_context.get("already_addressed"):
agency["can_user_fix"] = False
agency["blockers"].append("ai_already_handled")
return agency
# Analyze based on event type
if "po_approval" in event_type:
agency["can_user_fix"] = True
elif "delivery" in event_type or "supplier" in event_type:
agency.update(self._analyze_supplier_agency(metadata))
elif "equipment" in event_type:
agency.update(self._analyze_equipment_agency(metadata))
elif "stock" in event_type:
agency.update(self._analyze_stock_agency(metadata, orchestrator_context))
return agency
def _analyze_supplier_agency(self, metadata: Dict[str, Any]) -> dict:
"""Analyze agency for supplier-related alerts"""
agency = {
"requires_external_party": True,
"external_party_name": metadata.get("supplier_name"),
"external_party_contact": metadata.get("supplier_contact")
}
# User can contact supplier but can't directly fix
if not metadata.get("supplier_contact"):
agency["blockers"].append("no_supplier_contact")
return agency
def _analyze_equipment_agency(self, metadata: Dict[str, Any]) -> dict:
"""Analyze agency for equipment-related alerts"""
agency = {}
equipment_type = metadata.get("equipment_type", "")
if "oven" in equipment_type.lower() or "mixer" in equipment_type.lower():
agency["requires_external_party"] = True
agency["external_party_name"] = "Maintenance Team"
agency["blockers"].append("requires_technician")
return agency
def _analyze_stock_agency(
self,
metadata: Dict[str, Any],
orchestrator_context: dict
) -> dict:
"""Analyze agency for stock-related alerts"""
agency = {}
# If PO exists, user just needs to approve
if metadata.get("po_id"):
if metadata.get("po_status") == "pending_approval":
agency["can_user_fix"] = True
agency["suggested_workaround"] = "Approve pending PO"
else:
agency["blockers"].append("waiting_for_delivery")
agency["requires_external_party"] = True
agency["external_party_name"] = metadata.get("supplier_name")
# If no PO, user needs to create one
elif metadata.get("supplier_name"):
agency["can_user_fix"] = True
agency["requires_external_party"] = True
agency["external_party_name"] = metadata.get("supplier_name")
return agency