Files
bakery-ia/services/alert_processor/app/services/enrichment/enrichment_router.py
2025-11-27 15:52:40 +01:00

392 lines
17 KiB
Python

"""
Enrichment Router
Routes events to appropriate enrichment pipelines based on event_class:
- ALERT: Full enrichment (orchestrator, priority, smart actions, timing)
- NOTIFICATION: Lightweight enrichment (basic formatting only)
- RECOMMENDATION: Moderate enrichment (no orchestrator queries)
This enables 80% reduction in processing time for non-alert events.
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime, timezone, timedelta
import uuid
from shared.schemas.event_classification import (
RawEvent,
EventClass,
EventDomain,
NotificationType,
RecommendationType,
)
from services.alert_processor.app.models.events import (
Alert,
Notification,
Recommendation,
)
from services.alert_processor.app.services.enrichment.context_enrichment import ContextEnrichmentService
from services.alert_processor.app.services.enrichment.priority_scoring import PriorityScoringService
from services.alert_processor.app.services.enrichment.timing_intelligence import TimingIntelligenceService
from services.alert_processor.app.services.enrichment.orchestrator_client import OrchestratorClient
logger = logging.getLogger(__name__)
class EnrichmentRouter:
"""
Routes events to appropriate enrichment pipeline based on event_class.
"""
def __init__(
self,
context_enrichment_service: Optional[ContextEnrichmentService] = None,
priority_scoring_service: Optional[PriorityScoringService] = None,
timing_intelligence_service: Optional[TimingIntelligenceService] = None,
orchestrator_client: Optional[OrchestratorClient] = None,
):
"""Initialize enrichment router with services"""
self.context_enrichment = context_enrichment_service or ContextEnrichmentService()
self.priority_scoring = priority_scoring_service or PriorityScoringService()
self.timing_intelligence = timing_intelligence_service or TimingIntelligenceService()
self.orchestrator_client = orchestrator_client or OrchestratorClient()
async def enrich_event(self, raw_event: RawEvent) -> Alert | Notification | Recommendation:
"""
Route event to appropriate enrichment pipeline.
Args:
raw_event: Raw event from domain service
Returns:
Enriched Alert, Notification, or Recommendation model
Raises:
ValueError: If event_class is not recognized
"""
logger.info(
f"Enriching event: class={raw_event.event_class}, "
f"domain={raw_event.event_domain}, type={raw_event.event_type}"
)
if raw_event.event_class == EventClass.ALERT:
return await self._enrich_alert(raw_event)
elif raw_event.event_class == EventClass.NOTIFICATION:
return await self._enrich_notification(raw_event)
elif raw_event.event_class == EventClass.RECOMMENDATION:
return await self._enrich_recommendation(raw_event)
else:
raise ValueError(f"Unknown event_class: {raw_event.event_class}")
# ============================================================
# ALERT ENRICHMENT (Full Pipeline)
# ============================================================
async def _enrich_alert(self, raw_event: RawEvent) -> Alert:
"""
Full enrichment pipeline for alerts.
Steps:
1. Query orchestrator for context
2. Calculate business impact
3. Assess urgency
4. Determine user agency
5. Generate smart actions
6. Calculate priority score
7. Determine timing
8. Classify type_class
"""
logger.debug(f"Full enrichment for alert: {raw_event.event_type}")
# Step 1: Orchestrator context
orchestrator_context = await self._get_orchestrator_context(raw_event)
# Step 2-5: Context enrichment (business impact, urgency, user agency, smart actions)
enriched_context = await self.context_enrichment.enrich(
raw_event=raw_event,
orchestrator_context=orchestrator_context,
)
# Step 6: Priority scoring (multi-factor)
priority_data = await self.priority_scoring.calculate_priority(
raw_event=raw_event,
business_impact=enriched_context.get('business_impact'),
urgency_context=enriched_context.get('urgency_context'),
user_agency=enriched_context.get('user_agency'),
confidence_score=enriched_context.get('confidence_score', 0.8),
)
# Step 7: Timing intelligence
timing_data = await self.timing_intelligence.determine_timing(
priority_score=priority_data['priority_score'],
priority_level=priority_data['priority_level'],
type_class=enriched_context.get('type_class', 'action_needed'),
)
# Create Alert model
alert = Alert(
id=uuid.uuid4(),
tenant_id=uuid.UUID(raw_event.tenant_id),
event_domain=raw_event.event_domain.value,
event_type=raw_event.event_type,
service=raw_event.service,
title=raw_event.title,
message=raw_event.message,
type_class=enriched_context.get('type_class', 'action_needed'),
status='active',
priority_score=priority_data['priority_score'],
priority_level=priority_data['priority_level'],
orchestrator_context=orchestrator_context,
business_impact=enriched_context.get('business_impact'),
urgency_context=enriched_context.get('urgency_context'),
user_agency=enriched_context.get('user_agency'),
trend_context=enriched_context.get('trend_context'),
smart_actions=enriched_context.get('smart_actions', []),
ai_reasoning_summary=enriched_context.get('ai_reasoning_summary'),
confidence_score=enriched_context.get('confidence_score', 0.8),
timing_decision=timing_data['timing_decision'],
scheduled_send_time=timing_data.get('scheduled_send_time'),
placement=timing_data.get('placement', ['toast', 'action_queue', 'notification_panel']),
action_created_at=enriched_context.get('action_created_at'),
superseded_by_action_id=enriched_context.get('superseded_by_action_id'),
hidden_from_ui=enriched_context.get('hidden_from_ui', False),
alert_metadata=raw_event.event_metadata,
created_at=raw_event.timestamp or datetime.now(timezone.utc),
)
logger.info(
f"Alert enriched: {alert.event_type}, priority={alert.priority_score}, "
f"type_class={alert.type_class}"
)
return alert
async def _get_orchestrator_context(self, raw_event: RawEvent) -> Optional[Dict[str, Any]]:
"""Query orchestrator for recent actions related to this event"""
try:
# Extract relevant IDs from metadata
ingredient_id = raw_event.event_metadata.get('ingredient_id')
product_id = raw_event.event_metadata.get('product_id')
if not ingredient_id and not product_id:
return None
# Query orchestrator
recent_actions = await self.orchestrator_client.get_recent_actions(
tenant_id=raw_event.tenant_id,
ingredient_id=ingredient_id,
product_id=product_id,
)
if not recent_actions:
return None
# Return most recent action
action = recent_actions[0]
return {
'already_addressed': True,
'action_type': action.get('action_type'),
'action_id': action.get('action_id'),
'action_status': action.get('status'),
'delivery_date': action.get('delivery_date'),
'reasoning': action.get('reasoning'),
}
except Exception as e:
logger.warning(f"Failed to fetch orchestrator context: {e}")
return None
# ============================================================
# NOTIFICATION ENRICHMENT (Lightweight)
# ============================================================
async def _enrich_notification(self, raw_event: RawEvent) -> Notification:
"""
Lightweight enrichment for notifications.
No orchestrator queries, no priority scoring, no smart actions.
Just basic formatting and entity extraction.
"""
logger.debug(f"Lightweight enrichment for notification: {raw_event.event_type}")
# Infer notification_type from event_type
notification_type = self._infer_notification_type(raw_event.event_type)
# Extract entity context from metadata
entity_type, entity_id, old_state, new_state = self._extract_entity_context(raw_event)
# Create Notification model
notification = Notification(
id=uuid.uuid4(),
tenant_id=uuid.UUID(raw_event.tenant_id),
event_domain=raw_event.event_domain.value,
event_type=raw_event.event_type,
notification_type=notification_type.value,
service=raw_event.service,
title=raw_event.title,
message=raw_event.message,
entity_type=entity_type,
entity_id=entity_id,
old_state=old_state,
new_state=new_state,
notification_metadata=raw_event.event_metadata,
placement=['notification_panel'], # Lightweight: panel only, no toast
# expires_at set automatically in __init__ (7 days)
created_at=raw_event.timestamp or datetime.now(timezone.utc),
)
logger.info(f"Notification enriched: {notification.event_type}, entity={entity_type}:{entity_id}")
return notification
def _infer_notification_type(self, event_type: str) -> NotificationType:
"""Infer notification_type from event_type string"""
event_type_lower = event_type.lower()
if 'state_change' in event_type_lower or 'status_change' in event_type_lower:
return NotificationType.STATE_CHANGE
elif 'completed' in event_type_lower or 'finished' in event_type_lower:
return NotificationType.COMPLETION
elif 'received' in event_type_lower or 'arrived' in event_type_lower or 'arrival' in event_type_lower:
return NotificationType.ARRIVAL
elif 'shipped' in event_type_lower or 'sent' in event_type_lower or 'departure' in event_type_lower:
return NotificationType.DEPARTURE
elif 'started' in event_type_lower or 'created' in event_type_lower:
return NotificationType.SYSTEM_EVENT
else:
return NotificationType.UPDATE
def _extract_entity_context(self, raw_event: RawEvent) -> tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""Extract entity context from metadata"""
metadata = raw_event.event_metadata
# Try to infer entity_type from metadata keys
entity_type = None
entity_id = None
old_state = None
new_state = None
# Check for common entity types
if 'batch_id' in metadata:
entity_type = 'batch'
entity_id = metadata.get('batch_id')
old_state = metadata.get('old_status') or metadata.get('previous_status')
new_state = metadata.get('new_status') or metadata.get('status')
elif 'delivery_id' in metadata:
entity_type = 'delivery'
entity_id = metadata.get('delivery_id')
old_state = metadata.get('old_status')
new_state = metadata.get('new_status') or metadata.get('status')
elif 'po_id' in metadata or 'purchase_order_id' in metadata:
entity_type = 'purchase_order'
entity_id = metadata.get('po_id') or metadata.get('purchase_order_id')
old_state = metadata.get('old_status')
new_state = metadata.get('new_status') or metadata.get('status')
elif 'orchestration_run_id' in metadata or 'run_id' in metadata:
entity_type = 'orchestration_run'
entity_id = metadata.get('orchestration_run_id') or metadata.get('run_id')
old_state = metadata.get('old_status')
new_state = metadata.get('new_status') or metadata.get('status')
return entity_type, entity_id, old_state, new_state
# ============================================================
# RECOMMENDATION ENRICHMENT (Moderate)
# ============================================================
async def _enrich_recommendation(self, raw_event: RawEvent) -> Recommendation:
"""
Moderate enrichment for recommendations.
No orchestrator queries, light priority, basic suggested actions.
"""
logger.debug(f"Moderate enrichment for recommendation: {raw_event.event_type}")
# Infer recommendation_type from event_type
recommendation_type = self._infer_recommendation_type(raw_event.event_type)
# Calculate light priority (defaults to info, can be elevated based on metadata)
priority_level = self._calculate_light_priority(raw_event)
# Extract estimated impact from metadata
estimated_impact = self._extract_estimated_impact(raw_event)
# Generate basic suggested actions (lightweight, no smart action generation)
suggested_actions = self._generate_suggested_actions(raw_event)
# Create Recommendation model
recommendation = Recommendation(
id=uuid.uuid4(),
tenant_id=uuid.UUID(raw_event.tenant_id),
event_domain=raw_event.event_domain.value,
event_type=raw_event.event_type,
recommendation_type=recommendation_type.value,
service=raw_event.service,
title=raw_event.title,
message=raw_event.message,
priority_level=priority_level,
estimated_impact=estimated_impact,
suggested_actions=suggested_actions,
ai_reasoning_summary=raw_event.event_metadata.get('reasoning'),
confidence_score=raw_event.event_metadata.get('confidence_score', 0.7),
recommendation_metadata=raw_event.event_metadata,
created_at=raw_event.timestamp or datetime.now(timezone.utc),
)
logger.info(f"Recommendation enriched: {recommendation.event_type}, priority={priority_level}")
return recommendation
def _infer_recommendation_type(self, event_type: str) -> RecommendationType:
"""Infer recommendation_type from event_type string"""
event_type_lower = event_type.lower()
if 'optimization' in event_type_lower or 'efficiency' in event_type_lower:
return RecommendationType.OPTIMIZATION
elif 'cost' in event_type_lower or 'saving' in event_type_lower:
return RecommendationType.COST_REDUCTION
elif 'risk' in event_type_lower or 'prevent' in event_type_lower:
return RecommendationType.RISK_MITIGATION
elif 'trend' in event_type_lower or 'pattern' in event_type_lower:
return RecommendationType.TREND_INSIGHT
else:
return RecommendationType.BEST_PRACTICE
def _calculate_light_priority(self, raw_event: RawEvent) -> str:
"""Calculate light priority for recommendations (info by default)"""
metadata = raw_event.event_metadata
# Check for urgency hints in metadata
if metadata.get('urgent') or metadata.get('is_urgent'):
return 'important'
elif metadata.get('high_impact'):
return 'standard'
else:
return 'info'
def _extract_estimated_impact(self, raw_event: RawEvent) -> Optional[Dict[str, Any]]:
"""Extract estimated impact from metadata"""
metadata = raw_event.event_metadata
impact = {}
if 'estimated_savings_eur' in metadata:
impact['financial_savings_eur'] = metadata['estimated_savings_eur']
if 'estimated_time_saved_hours' in metadata:
impact['time_saved_hours'] = metadata['estimated_time_saved_hours']
if 'efficiency_gain_percent' in metadata:
impact['efficiency_gain_percent'] = metadata['efficiency_gain_percent']
return impact if impact else None
def _generate_suggested_actions(self, raw_event: RawEvent) -> Optional[list[Dict[str, Any]]]:
"""Generate basic suggested actions (lightweight, no smart action logic)"""
# If actions provided in raw_event, use them
if raw_event.actions:
return [{'type': action, 'label': action.replace('_', ' ').title()} for action in raw_event.actions]
# Otherwise, return None (optional actions)
return None