Files
2025-12-13 23:57:54 +01:00

296 lines
12 KiB
Python

"""
RabbitMQ event consumer.
Consumes minimal events from services and processes them through
the enrichment pipeline.
"""
import asyncio
import json
from datetime import datetime, timezone
from aio_pika import connect_robust, IncomingMessage, Connection, Channel
import structlog
from app.core.config import settings
from app.core.database import AsyncSessionLocal
from shared.messaging import MinimalEvent
from app.services.enrichment_orchestrator import EnrichmentOrchestrator
from app.repositories.event_repository import EventRepository
from shared.clients.notification_client import create_notification_client
from app.services.sse_service import SSEService
logger = structlog.get_logger()
class EventConsumer:
"""
RabbitMQ consumer for processing events.
Workflow:
1. Receive minimal event from service
2. Enrich with context (AI, priority, impact, etc.)
3. Store in database
4. Send to notification service
5. Publish to SSE stream
"""
def __init__(self):
self.connection: Connection = None
self.channel: Channel = None
self.enricher = EnrichmentOrchestrator()
self.notification_client = create_notification_client(settings)
self.sse_svc = SSEService()
async def start(self):
"""Start consuming events from RabbitMQ"""
try:
# Connect to RabbitMQ
self.connection = await connect_robust(
settings.RABBITMQ_URL,
client_properties={"connection_name": "alert-processor"}
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
# Declare queue
queue = await self.channel.declare_queue(
settings.RABBITMQ_QUEUE,
durable=True
)
# Bind to events exchange with routing patterns
exchange = await self.channel.declare_exchange(
settings.RABBITMQ_EXCHANGE,
"topic",
durable=True
)
# Bind to alert, notification, and recommendation events
await queue.bind(exchange, routing_key="alert.#")
await queue.bind(exchange, routing_key="notification.#")
await queue.bind(exchange, routing_key="recommendation.#")
# Start consuming
await queue.consume(self.process_message)
logger.info(
"event_consumer_started",
queue=settings.RABBITMQ_QUEUE,
exchange=settings.RABBITMQ_EXCHANGE
)
except Exception as e:
logger.error("consumer_start_failed", error=str(e))
raise
async def process_message(self, message: IncomingMessage):
"""
Process incoming event message.
Steps:
1. Parse message
2. Validate as MinimalEvent
3. Enrich event
4. Store in database
5. Send notification
6. Publish to SSE
7. Acknowledge message
"""
async with message.process():
try:
# Parse message
data = json.loads(message.body.decode())
event = MinimalEvent(**data)
logger.info(
"event_received",
event_type=event.event_type,
event_class=event.event_class,
tenant_id=event.tenant_id
)
# Enrich the event
enriched_event = await self.enricher.enrich_event(event)
# Check for duplicate alerts before storing
async with AsyncSessionLocal() as session:
repo = EventRepository(session)
# Check for duplicate if it's an alert
if event.event_class == "alert":
from uuid import UUID
duplicate_event = await repo.check_duplicate_alert(
tenant_id=UUID(event.tenant_id),
event_type=event.event_type,
entity_links=enriched_event.entity_links,
event_metadata=enriched_event.event_metadata,
time_window_hours=24 # Check for duplicates in last 24 hours
)
if duplicate_event:
logger.info(
"Duplicate alert detected, skipping",
event_type=event.event_type,
tenant_id=event.tenant_id,
duplicate_event_id=str(duplicate_event.id)
)
# Update the existing event's metadata instead of creating a new one
# This could include updating delay times, affected orders, etc.
duplicate_event.event_metadata = enriched_event.event_metadata
duplicate_event.updated_at = datetime.now(timezone.utc)
duplicate_event.priority_score = enriched_event.priority_score
duplicate_event.priority_level = enriched_event.priority_level
# Update other relevant fields that might have changed
duplicate_event.urgency = enriched_event.urgency.dict() if enriched_event.urgency else None
duplicate_event.business_impact = enriched_event.business_impact.dict() if enriched_event.business_impact else None
await session.commit()
await session.refresh(duplicate_event)
# Send notification for updated event
await self._send_notification(duplicate_event)
# Publish to SSE
await self.sse_svc.publish_event(duplicate_event)
logger.info(
"Duplicate alert updated",
event_id=str(duplicate_event.id),
event_type=event.event_type,
priority_level=duplicate_event.priority_level,
priority_score=duplicate_event.priority_score
)
return # Exit early since we handled the duplicate
else:
logger.info(
"New unique alert, proceeding with creation",
event_type=event.event_type,
tenant_id=event.tenant_id
)
# Store in database (if not a duplicate)
stored_event = await repo.create_event(enriched_event)
# Send to notification service (if alert)
if event.event_class == "alert":
await self._send_notification(stored_event)
# Publish to SSE
await self.sse_svc.publish_event(stored_event)
logger.info(
"event_processed",
event_id=stored_event.id,
event_type=event.event_type,
priority_level=stored_event.priority_level,
priority_score=stored_event.priority_score
)
except json.JSONDecodeError as e:
logger.error(
"message_parse_failed",
error=str(e),
message_body=message.body[:200]
)
# Don't requeue - bad message format
except Exception as e:
logger.error(
"event_processing_failed",
error=str(e),
exc_info=True
)
# Message will be requeued automatically due to exception
async def _send_notification(self, event):
"""
Send notification using the shared notification client.
Args:
event: The event to send as a notification
"""
try:
# Prepare notification message
# Use i18n title and message from the event as the notification content
title = event.i18n_title_key if event.i18n_title_key else f"Alert: {event.event_type}"
message = event.i18n_message_key if event.i18n_message_key else f"New alert: {event.event_type}"
# Add parameters to make it more informative
if event.i18n_title_params:
title += f" - {event.i18n_title_params}"
if event.i18n_message_params:
message += f" - {event.i18n_message_params}"
# Prepare metadata from the event
metadata = {
"event_id": str(event.id),
"event_type": event.event_type,
"event_domain": event.event_domain,
"priority_score": event.priority_score,
"priority_level": event.priority_level,
"status": event.status,
"created_at": event.created_at.isoformat() if event.created_at else None,
"type_class": event.type_class,
"smart_actions": event.smart_actions,
"entity_links": event.entity_links
}
# Determine notification priority based on event priority
priority_map = {
"critical": "urgent",
"important": "high",
"standard": "normal",
"info": "low"
}
priority = priority_map.get(event.priority_level, "normal")
# Send notification using shared client
result = await self.notification_client.send_notification(
tenant_id=str(event.tenant_id),
notification_type="in_app", # Using in-app notification by default
message=message,
subject=title,
priority=priority,
metadata=metadata
)
if result:
logger.info(
"notification_sent_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id),
priority_level=event.priority_level
)
else:
logger.warning(
"notification_failed_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
except Exception as e:
logger.error(
"notification_error_via_shared_client",
error=str(e),
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
# Don't re-raise - we don't want to fail the entire event processing
# if notification sending fails
async def stop(self):
"""Stop consumer and close connections"""
try:
if self.channel:
await self.channel.close()
logger.info("rabbitmq_channel_closed")
if self.connection:
await self.connection.close()
logger.info("rabbitmq_connection_closed")
except Exception as e:
logger.error("consumer_stop_failed", error=str(e))