Files
bakery-ia/services/alert_processor/app/consumer/event_consumer.py
2025-12-05 20:07:01 +01:00

240 lines
8.3 KiB
Python

"""
RabbitMQ event consumer.
Consumes minimal events from services and processes them through
the enrichment pipeline.
"""
import asyncio
import json
from aio_pika import connect_robust, IncomingMessage, Connection, Channel
import structlog
from app.core.config import settings
from app.core.database import AsyncSessionLocal
from shared.schemas.events import MinimalEvent
from app.services.enrichment_orchestrator import EnrichmentOrchestrator
from app.repositories.event_repository import EventRepository
from shared.clients.notification_client import create_notification_client
from app.services.sse_service import SSEService
logger = structlog.get_logger()
class EventConsumer:
"""
RabbitMQ consumer for processing events.
Workflow:
1. Receive minimal event from service
2. Enrich with context (AI, priority, impact, etc.)
3. Store in database
4. Send to notification service
5. Publish to SSE stream
"""
def __init__(self):
self.connection: Connection = None
self.channel: Channel = None
self.enricher = EnrichmentOrchestrator()
self.notification_client = create_notification_client(settings)
self.sse_svc = SSEService()
async def start(self):
"""Start consuming events from RabbitMQ"""
try:
# Connect to RabbitMQ
self.connection = await connect_robust(
settings.RABBITMQ_URL,
client_properties={"connection_name": "alert-processor"}
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
# Declare queue
queue = await self.channel.declare_queue(
settings.RABBITMQ_QUEUE,
durable=True
)
# Bind to events exchange with routing patterns
exchange = await self.channel.declare_exchange(
settings.RABBITMQ_EXCHANGE,
"topic",
durable=True
)
# Bind to alert, notification, and recommendation events
await queue.bind(exchange, routing_key="alert.#")
await queue.bind(exchange, routing_key="notification.#")
await queue.bind(exchange, routing_key="recommendation.#")
# Start consuming
await queue.consume(self.process_message)
logger.info(
"event_consumer_started",
queue=settings.RABBITMQ_QUEUE,
exchange=settings.RABBITMQ_EXCHANGE
)
except Exception as e:
logger.error("consumer_start_failed", error=str(e))
raise
async def process_message(self, message: IncomingMessage):
"""
Process incoming event message.
Steps:
1. Parse message
2. Validate as MinimalEvent
3. Enrich event
4. Store in database
5. Send notification
6. Publish to SSE
7. Acknowledge message
"""
async with message.process():
try:
# Parse message
data = json.loads(message.body.decode())
event = MinimalEvent(**data)
logger.info(
"event_received",
event_type=event.event_type,
event_class=event.event_class,
tenant_id=event.tenant_id
)
# Enrich the event
enriched_event = await self.enricher.enrich_event(event)
# Store in database
async with AsyncSessionLocal() as session:
repo = EventRepository(session)
stored_event = await repo.create_event(enriched_event)
# Send to notification service (if alert)
if event.event_class == "alert":
await self._send_notification(stored_event)
# Publish to SSE
await self.sse_svc.publish_event(stored_event)
logger.info(
"event_processed",
event_id=stored_event.id,
event_type=event.event_type,
priority_level=stored_event.priority_level,
priority_score=stored_event.priority_score
)
except json.JSONDecodeError as e:
logger.error(
"message_parse_failed",
error=str(e),
message_body=message.body[:200]
)
# Don't requeue - bad message format
except Exception as e:
logger.error(
"event_processing_failed",
error=str(e),
exc_info=True
)
# Message will be requeued automatically due to exception
async def _send_notification(self, event):
"""
Send notification using the shared notification client.
Args:
event: The event to send as a notification
"""
try:
# Prepare notification message
# Use i18n title and message from the event as the notification content
title = event.i18n_title_key if event.i18n_title_key else f"Alert: {event.event_type}"
message = event.i18n_message_key if event.i18n_message_key else f"New alert: {event.event_type}"
# Add parameters to make it more informative
if event.i18n_title_params:
title += f" - {event.i18n_title_params}"
if event.i18n_message_params:
message += f" - {event.i18n_message_params}"
# Prepare metadata from the event
metadata = {
"event_id": str(event.id),
"event_type": event.event_type,
"event_domain": event.event_domain,
"priority_score": event.priority_score,
"priority_level": event.priority_level,
"status": event.status,
"created_at": event.created_at.isoformat() if event.created_at else None,
"type_class": event.type_class,
"smart_actions": event.smart_actions,
"entity_links": event.entity_links
}
# Determine notification priority based on event priority
priority_map = {
"critical": "urgent",
"important": "high",
"standard": "normal",
"info": "low"
}
priority = priority_map.get(event.priority_level, "normal")
# Send notification using shared client
result = await self.notification_client.send_notification(
tenant_id=str(event.tenant_id),
notification_type="in_app", # Using in-app notification by default
message=message,
subject=title,
priority=priority,
metadata=metadata
)
if result:
logger.info(
"notification_sent_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id),
priority_level=event.priority_level
)
else:
logger.warning(
"notification_failed_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
except Exception as e:
logger.error(
"notification_error_via_shared_client",
error=str(e),
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
# Don't re-raise - we don't want to fail the entire event processing
# if notification sending fails
async def stop(self):
"""Stop consumer and close connections"""
try:
if self.channel:
await self.channel.close()
logger.info("rabbitmq_channel_closed")
if self.connection:
await self.connection.close()
logger.info("rabbitmq_connection_closed")
except Exception as e:
logger.error("consumer_stop_failed", error=str(e))