Files
bakery-ia/shared/config/rabbitmq_config.py
2025-12-13 23:57:54 +01:00

216 lines
6.7 KiB
Python
Executable File

# shared/config/rabbitmq_config.py
"""
RabbitMQ configuration for the event system
Supports three event classes through a unified topic exchange:
- ALERT: Actionable events requiring user decision
- NOTIFICATION: Informational state changes
- RECOMMENDATION: AI-generated suggestions
Routing key pattern: {event_class}.{event_domain}.{severity}
Examples:
- alert.inventory.urgent
- notification.production.info
- recommendation.demand.medium
"""
RABBITMQ_CONFIG = {
"exchanges": {
"events": {
"name": "events.exchange",
"type": "topic",
"durable": True,
"auto_delete": False
},
# Legacy exchange for backward compatibility during migration
"alerts": {
"name": "alerts.exchange",
"type": "topic",
"durable": True,
"auto_delete": False
},
"dead_letter": {
"name": "dlx.exchange",
"type": "direct",
"durable": True,
"auto_delete": False
}
},
"queues": {
"event_processing": {
"name": "event.processing.queue",
"durable": True,
"arguments": {
"x-message-ttl": 3600000, # 1 hour TTL
"x-max-length": 10000, # Max 10k messages
"x-overflow": "reject-publish",
"x-dead-letter-exchange": "dlx.exchange",
"x-dead-letter-routing-key": "failed.events"
}
},
# Legacy queue for backward compatibility
"alert_processing": {
"name": "alert.processing.queue",
"durable": True,
"arguments": {
"x-message-ttl": 3600000,
"x-max-length": 10000,
"x-overflow": "reject-publish",
"x-dead-letter-exchange": "dlx.exchange",
"x-dead-letter-routing-key": "failed.items"
}
},
"dead_letter": {
"name": "event.dead_letter.queue",
"durable": True,
"arguments": {
"x-message-ttl": 86400000 # 24 hours for dead letters
}
}
},
"bindings": [
# New event architecture bindings
{
"queue": "event.processing.queue",
"exchange": "events.exchange",
"routing_key": "*.*.*" # event_class.event_domain.severity
},
# Legacy bindings for backward compatibility
{
"queue": "alert.processing.queue",
"exchange": "alerts.exchange",
"routing_key": "*.*.*" # alert/recommendation.severity.service
},
{
"queue": "event.dead_letter.queue",
"exchange": "dlx.exchange",
"routing_key": "failed.events"
},
{
"queue": "event.dead_letter.queue",
"exchange": "dlx.exchange",
"routing_key": "failed.items" # Legacy
}
],
"routing_patterns": {
# New event architecture patterns
# event_class.event_domain.severity
"alert_inventory": "alert.inventory.*",
"alert_production": "alert.production.*",
"alert_supply_chain": "alert.supply_chain.*",
"notification_inventory": "notification.inventory.*",
"notification_production": "notification.production.*",
"notification_operations": "notification.operations.*",
"recommendation_all": "recommendation.*.*",
# By severity
"all_urgent": "*.*.urgent",
"all_high": "*.*.high",
"all_medium": "*.*.medium",
"all_low": "*.*.low",
# By event class
"all_alerts": "alert.*.*",
"all_notifications": "notification.*.*",
"all_recommendations": "recommendation.*.*",
# By domain
"inventory_all": "*.inventory.*",
"production_all": "*.production.*",
"supply_chain_all": "*.supply_chain.*",
"demand_all": "*.demand.*",
"operations_all": "*.operations.*",
# Legacy patterns (for backward compatibility)
"legacy_alert": "alert.{severity}.{service}",
"legacy_recommendation": "recommendation.{severity}.{service}",
}
}
def get_routing_key(event_class: str, event_domain: str, severity: str) -> str:
"""
Generate routing key for event publishing.
New pattern: {event_class}.{event_domain}.{severity}
Args:
event_class: 'alert', 'notification', or 'recommendation'
event_domain: 'inventory', 'production', 'supply_chain', 'demand', 'operations'
severity: 'urgent', 'high', 'medium', 'low'
Returns:
Routing key string
Examples:
>>> get_routing_key('alert', 'inventory', 'urgent')
'alert.inventory.urgent'
>>> get_routing_key('notification', 'production', 'info')
'notification.production.info'
"""
return f"{event_class}.{event_domain}.{severity}"
def get_legacy_routing_key(item_type: str, severity: str, service: str) -> str:
"""
Generate legacy routing key for backward compatibility.
Legacy pattern: {item_type}.{severity}.{service}
TODO: Remove after migration is complete.
"""
return f"{item_type}.{severity}.{service}"
def get_binding_patterns(
event_classes: list = None,
event_domains: list = None,
severities: list = None
) -> list:
"""
Generate binding patterns for selective consumption.
Args:
event_classes: List of event classes to bind (default: all)
event_domains: List of event domains to bind (default: all)
severities: List of severities to bind (default: all)
Returns:
List of routing key patterns
Examples:
>>> get_binding_patterns(['alert'], ['inventory'], ['urgent', 'high'])
['alert.inventory.urgent', 'alert.inventory.high']
"""
patterns = []
event_classes = event_classes or ["alert", "notification", "recommendation"]
event_domains = event_domains or ["inventory", "production", "supply_chain", "demand", "operations"]
severities = severities or ["urgent", "high", "medium", "low"]
for event_class in event_classes:
for event_domain in event_domains:
for severity in severities:
patterns.append(f"{event_class}.{event_domain}.{severity}")
return patterns
def priority_score_to_severity(priority_score: int) -> str:
"""
Convert priority score (0-100) to severity level.
Args:
priority_score: Priority score (0-100)
Returns:
Severity level: 'urgent', 'high', 'medium', or 'low'
"""
if priority_score >= 90:
return "urgent"
elif priority_score >= 70:
return "high"
elif priority_score >= 50:
return "medium"
else:
return "low"