# shared/config/rabbitmq_config.py """ RabbitMQ configuration for the event system Supports three event classes through a unified topic exchange: - ALERT: Actionable events requiring user decision - NOTIFICATION: Informational state changes - RECOMMENDATION: AI-generated suggestions Routing key pattern: {event_class}.{event_domain}.{severity} Examples: - alert.inventory.urgent - notification.production.info - recommendation.demand.medium """ RABBITMQ_CONFIG = { "exchanges": { "events": { "name": "events.exchange", "type": "topic", "durable": True, "auto_delete": False }, # Legacy exchange for backward compatibility during migration "alerts": { "name": "alerts.exchange", "type": "topic", "durable": True, "auto_delete": False }, "dead_letter": { "name": "dlx.exchange", "type": "direct", "durable": True, "auto_delete": False } }, "queues": { "event_processing": { "name": "event.processing.queue", "durable": True, "arguments": { "x-message-ttl": 3600000, # 1 hour TTL "x-max-length": 10000, # Max 10k messages "x-overflow": "reject-publish", "x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "failed.events" } }, # Legacy queue for backward compatibility "alert_processing": { "name": "alert.processing.queue", "durable": True, "arguments": { "x-message-ttl": 3600000, "x-max-length": 10000, "x-overflow": "reject-publish", "x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "failed.items" } }, "dead_letter": { "name": "event.dead_letter.queue", "durable": True, "arguments": { "x-message-ttl": 86400000 # 24 hours for dead letters } } }, "bindings": [ # New event architecture bindings { "queue": "event.processing.queue", "exchange": "events.exchange", "routing_key": "*.*.*" # event_class.event_domain.severity }, # Legacy bindings for backward compatibility { "queue": "alert.processing.queue", "exchange": "alerts.exchange", "routing_key": "*.*.*" # alert/recommendation.severity.service }, { "queue": "event.dead_letter.queue", "exchange": "dlx.exchange", "routing_key": "failed.events" }, { "queue": "event.dead_letter.queue", "exchange": "dlx.exchange", "routing_key": "failed.items" # Legacy } ], "routing_patterns": { # New event architecture patterns # event_class.event_domain.severity "alert_inventory": "alert.inventory.*", "alert_production": "alert.production.*", "alert_supply_chain": "alert.supply_chain.*", "notification_inventory": "notification.inventory.*", "notification_production": "notification.production.*", "notification_operations": "notification.operations.*", "recommendation_all": "recommendation.*.*", # By severity "all_urgent": "*.*.urgent", "all_high": "*.*.high", "all_medium": "*.*.medium", "all_low": "*.*.low", # By event class "all_alerts": "alert.*.*", "all_notifications": "notification.*.*", "all_recommendations": "recommendation.*.*", # By domain "inventory_all": "*.inventory.*", "production_all": "*.production.*", "supply_chain_all": "*.supply_chain.*", "demand_all": "*.demand.*", "operations_all": "*.operations.*", # Legacy patterns (for backward compatibility) "legacy_alert": "alert.{severity}.{service}", "legacy_recommendation": "recommendation.{severity}.{service}", } } def get_routing_key(event_class: str, event_domain: str, severity: str) -> str: """ Generate routing key for event publishing. New pattern: {event_class}.{event_domain}.{severity} Args: event_class: 'alert', 'notification', or 'recommendation' event_domain: 'inventory', 'production', 'supply_chain', 'demand', 'operations' severity: 'urgent', 'high', 'medium', 'low' Returns: Routing key string Examples: >>> get_routing_key('alert', 'inventory', 'urgent') 'alert.inventory.urgent' >>> get_routing_key('notification', 'production', 'info') 'notification.production.info' """ return f"{event_class}.{event_domain}.{severity}" def get_legacy_routing_key(item_type: str, severity: str, service: str) -> str: """ Generate legacy routing key for backward compatibility. Legacy pattern: {item_type}.{severity}.{service} TODO: Remove after migration is complete. """ return f"{item_type}.{severity}.{service}" def get_binding_patterns( event_classes: list = None, event_domains: list = None, severities: list = None ) -> list: """ Generate binding patterns for selective consumption. Args: event_classes: List of event classes to bind (default: all) event_domains: List of event domains to bind (default: all) severities: List of severities to bind (default: all) Returns: List of routing key patterns Examples: >>> get_binding_patterns(['alert'], ['inventory'], ['urgent', 'high']) ['alert.inventory.urgent', 'alert.inventory.high'] """ patterns = [] event_classes = event_classes or ["alert", "notification", "recommendation"] event_domains = event_domains or ["inventory", "production", "supply_chain", "demand", "operations"] severities = severities or ["urgent", "high", "medium", "low"] for event_class in event_classes: for event_domain in event_domains: for severity in severities: patterns.append(f"{event_class}.{event_domain}.{severity}") return patterns def priority_score_to_severity(priority_score: int) -> str: """ Convert priority score (0-100) to severity level. Args: priority_score: Priority score (0-100) Returns: Severity level: 'urgent', 'high', 'medium', or 'low' """ if priority_score >= 90: return "urgent" elif priority_score >= 70: return "high" elif priority_score >= 50: return "medium" else: return "low"