""" RabbitMQ event consumer. Consumes minimal events from services and processes them through the enrichment pipeline. """ import asyncio import json from aio_pika import connect_robust, IncomingMessage, Connection, Channel import structlog from app.core.config import settings from app.core.database import AsyncSessionLocal from shared.messaging import MinimalEvent from app.services.enrichment_orchestrator import EnrichmentOrchestrator from app.repositories.event_repository import EventRepository from shared.clients.notification_client import create_notification_client from app.services.sse_service import SSEService logger = structlog.get_logger() class EventConsumer: """ RabbitMQ consumer for processing events. Workflow: 1. Receive minimal event from service 2. Enrich with context (AI, priority, impact, etc.) 3. Store in database 4. Send to notification service 5. Publish to SSE stream """ def __init__(self): self.connection: Connection = None self.channel: Channel = None self.enricher = EnrichmentOrchestrator() self.notification_client = create_notification_client(settings) self.sse_svc = SSEService() async def start(self): """Start consuming events from RabbitMQ""" try: # Connect to RabbitMQ self.connection = await connect_robust( settings.RABBITMQ_URL, client_properties={"connection_name": "alert-processor"} ) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=10) # Declare queue queue = await self.channel.declare_queue( settings.RABBITMQ_QUEUE, durable=True ) # Bind to events exchange with routing patterns exchange = await self.channel.declare_exchange( settings.RABBITMQ_EXCHANGE, "topic", durable=True ) # Bind to alert, notification, and recommendation events await queue.bind(exchange, routing_key="alert.#") await queue.bind(exchange, routing_key="notification.#") await queue.bind(exchange, routing_key="recommendation.#") # Start consuming await queue.consume(self.process_message) logger.info( "event_consumer_started", queue=settings.RABBITMQ_QUEUE, exchange=settings.RABBITMQ_EXCHANGE ) except Exception as e: logger.error("consumer_start_failed", error=str(e)) raise async def process_message(self, message: IncomingMessage): """ Process incoming event message. Steps: 1. Parse message 2. Validate as MinimalEvent 3. Enrich event 4. Store in database 5. Send notification 6. Publish to SSE 7. Acknowledge message """ async with message.process(): try: # Parse message data = json.loads(message.body.decode()) event = MinimalEvent(**data) logger.info( "event_received", event_type=event.event_type, event_class=event.event_class, tenant_id=event.tenant_id ) # Enrich the event enriched_event = await self.enricher.enrich_event(event) # Store in database async with AsyncSessionLocal() as session: repo = EventRepository(session) stored_event = await repo.create_event(enriched_event) # Send to notification service (if alert) if event.event_class == "alert": await self._send_notification(stored_event) # Publish to SSE await self.sse_svc.publish_event(stored_event) logger.info( "event_processed", event_id=stored_event.id, event_type=event.event_type, priority_level=stored_event.priority_level, priority_score=stored_event.priority_score ) except json.JSONDecodeError as e: logger.error( "message_parse_failed", error=str(e), message_body=message.body[:200] ) # Don't requeue - bad message format except Exception as e: logger.error( "event_processing_failed", error=str(e), exc_info=True ) # Message will be requeued automatically due to exception async def _send_notification(self, event): """ Send notification using the shared notification client. Args: event: The event to send as a notification """ try: # Prepare notification message # Use i18n title and message from the event as the notification content title = event.i18n_title_key if event.i18n_title_key else f"Alert: {event.event_type}" message = event.i18n_message_key if event.i18n_message_key else f"New alert: {event.event_type}" # Add parameters to make it more informative if event.i18n_title_params: title += f" - {event.i18n_title_params}" if event.i18n_message_params: message += f" - {event.i18n_message_params}" # Prepare metadata from the event metadata = { "event_id": str(event.id), "event_type": event.event_type, "event_domain": event.event_domain, "priority_score": event.priority_score, "priority_level": event.priority_level, "status": event.status, "created_at": event.created_at.isoformat() if event.created_at else None, "type_class": event.type_class, "smart_actions": event.smart_actions, "entity_links": event.entity_links } # Determine notification priority based on event priority priority_map = { "critical": "urgent", "important": "high", "standard": "normal", "info": "low" } priority = priority_map.get(event.priority_level, "normal") # Send notification using shared client result = await self.notification_client.send_notification( tenant_id=str(event.tenant_id), notification_type="in_app", # Using in-app notification by default message=message, subject=title, priority=priority, metadata=metadata ) if result: logger.info( "notification_sent_via_shared_client", event_id=str(event.id), tenant_id=str(event.tenant_id), priority_level=event.priority_level ) else: logger.warning( "notification_failed_via_shared_client", event_id=str(event.id), tenant_id=str(event.tenant_id) ) except Exception as e: logger.error( "notification_error_via_shared_client", error=str(e), event_id=str(event.id), tenant_id=str(event.tenant_id) ) # Don't re-raise - we don't want to fail the entire event processing # if notification sending fails async def stop(self): """Stop consumer and close connections""" try: if self.channel: await self.channel.close() logger.info("rabbitmq_channel_closed") if self.connection: await self.connection.close() logger.info("rabbitmq_connection_closed") except Exception as e: logger.error("consumer_stop_failed", error=str(e))