Files
bakery-ia/services/alert_processor/app/consumer/event_consumer.py

296 lines
12 KiB
Python
Raw Normal View History

2025-12-05 20:07:01 +01:00
"""
RabbitMQ event consumer.
Consumes minimal events from services and processes them through
the enrichment pipeline.
"""
import asyncio
import json
2025-12-13 23:57:54 +01:00
from datetime import datetime, timezone
2025-12-05 20:07:01 +01:00
from aio_pika import connect_robust, IncomingMessage, Connection, Channel
import structlog
from app.core.config import settings
from app.core.database import AsyncSessionLocal
2025-12-09 10:21:41 +01:00
from shared.messaging import MinimalEvent
2025-12-05 20:07:01 +01:00
from app.services.enrichment_orchestrator import EnrichmentOrchestrator
from app.repositories.event_repository import EventRepository
from shared.clients.notification_client import create_notification_client
from app.services.sse_service import SSEService
logger = structlog.get_logger()
class EventConsumer:
"""
RabbitMQ consumer for processing events.
Workflow:
1. Receive minimal event from service
2. Enrich with context (AI, priority, impact, etc.)
3. Store in database
4. Send to notification service
5. Publish to SSE stream
"""
def __init__(self):
self.connection: Connection = None
self.channel: Channel = None
self.enricher = EnrichmentOrchestrator()
self.notification_client = create_notification_client(settings)
self.sse_svc = SSEService()
async def start(self):
"""Start consuming events from RabbitMQ"""
try:
# Connect to RabbitMQ
self.connection = await connect_robust(
settings.RABBITMQ_URL,
client_properties={"connection_name": "alert-processor"}
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
# Declare queue
queue = await self.channel.declare_queue(
settings.RABBITMQ_QUEUE,
durable=True
)
# Bind to events exchange with routing patterns
exchange = await self.channel.declare_exchange(
settings.RABBITMQ_EXCHANGE,
"topic",
durable=True
)
# Bind to alert, notification, and recommendation events
await queue.bind(exchange, routing_key="alert.#")
await queue.bind(exchange, routing_key="notification.#")
await queue.bind(exchange, routing_key="recommendation.#")
# Start consuming
await queue.consume(self.process_message)
logger.info(
"event_consumer_started",
queue=settings.RABBITMQ_QUEUE,
exchange=settings.RABBITMQ_EXCHANGE
)
except Exception as e:
logger.error("consumer_start_failed", error=str(e))
raise
async def process_message(self, message: IncomingMessage):
"""
Process incoming event message.
Steps:
1. Parse message
2. Validate as MinimalEvent
3. Enrich event
4. Store in database
5. Send notification
6. Publish to SSE
7. Acknowledge message
"""
async with message.process():
try:
# Parse message
data = json.loads(message.body.decode())
event = MinimalEvent(**data)
logger.info(
"event_received",
event_type=event.event_type,
event_class=event.event_class,
tenant_id=event.tenant_id
)
# Enrich the event
enriched_event = await self.enricher.enrich_event(event)
2025-12-13 23:57:54 +01:00
# Check for duplicate alerts before storing
2025-12-05 20:07:01 +01:00
async with AsyncSessionLocal() as session:
repo = EventRepository(session)
2025-12-13 23:57:54 +01:00
# Check for duplicate if it's an alert
if event.event_class == "alert":
from uuid import UUID
duplicate_event = await repo.check_duplicate_alert(
tenant_id=UUID(event.tenant_id),
event_type=event.event_type,
entity_links=enriched_event.entity_links,
event_metadata=enriched_event.event_metadata,
time_window_hours=24 # Check for duplicates in last 24 hours
)
if duplicate_event:
logger.info(
"Duplicate alert detected, skipping",
event_type=event.event_type,
tenant_id=event.tenant_id,
duplicate_event_id=str(duplicate_event.id)
)
# Update the existing event's metadata instead of creating a new one
# This could include updating delay times, affected orders, etc.
duplicate_event.event_metadata = enriched_event.event_metadata
duplicate_event.updated_at = datetime.now(timezone.utc)
duplicate_event.priority_score = enriched_event.priority_score
duplicate_event.priority_level = enriched_event.priority_level
# Update other relevant fields that might have changed
duplicate_event.urgency = enriched_event.urgency.dict() if enriched_event.urgency else None
duplicate_event.business_impact = enriched_event.business_impact.dict() if enriched_event.business_impact else None
await session.commit()
await session.refresh(duplicate_event)
# Send notification for updated event
await self._send_notification(duplicate_event)
# Publish to SSE
await self.sse_svc.publish_event(duplicate_event)
logger.info(
"Duplicate alert updated",
event_id=str(duplicate_event.id),
event_type=event.event_type,
priority_level=duplicate_event.priority_level,
priority_score=duplicate_event.priority_score
)
return # Exit early since we handled the duplicate
else:
logger.info(
"New unique alert, proceeding with creation",
event_type=event.event_type,
tenant_id=event.tenant_id
)
# Store in database (if not a duplicate)
2025-12-05 20:07:01 +01:00
stored_event = await repo.create_event(enriched_event)
# Send to notification service (if alert)
if event.event_class == "alert":
await self._send_notification(stored_event)
# Publish to SSE
await self.sse_svc.publish_event(stored_event)
logger.info(
"event_processed",
event_id=stored_event.id,
event_type=event.event_type,
priority_level=stored_event.priority_level,
priority_score=stored_event.priority_score
)
except json.JSONDecodeError as e:
logger.error(
"message_parse_failed",
error=str(e),
message_body=message.body[:200]
)
# Don't requeue - bad message format
except Exception as e:
logger.error(
"event_processing_failed",
error=str(e),
exc_info=True
)
# Message will be requeued automatically due to exception
async def _send_notification(self, event):
"""
Send notification using the shared notification client.
Args:
event: The event to send as a notification
"""
try:
# Prepare notification message
# Use i18n title and message from the event as the notification content
title = event.i18n_title_key if event.i18n_title_key else f"Alert: {event.event_type}"
message = event.i18n_message_key if event.i18n_message_key else f"New alert: {event.event_type}"
# Add parameters to make it more informative
if event.i18n_title_params:
title += f" - {event.i18n_title_params}"
if event.i18n_message_params:
message += f" - {event.i18n_message_params}"
# Prepare metadata from the event
metadata = {
"event_id": str(event.id),
"event_type": event.event_type,
"event_domain": event.event_domain,
"priority_score": event.priority_score,
"priority_level": event.priority_level,
"status": event.status,
"created_at": event.created_at.isoformat() if event.created_at else None,
"type_class": event.type_class,
"smart_actions": event.smart_actions,
"entity_links": event.entity_links
}
# Determine notification priority based on event priority
priority_map = {
"critical": "urgent",
"important": "high",
"standard": "normal",
"info": "low"
}
priority = priority_map.get(event.priority_level, "normal")
# Send notification using shared client
result = await self.notification_client.send_notification(
tenant_id=str(event.tenant_id),
notification_type="in_app", # Using in-app notification by default
message=message,
subject=title,
priority=priority,
metadata=metadata
)
if result:
logger.info(
"notification_sent_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id),
priority_level=event.priority_level
)
else:
logger.warning(
"notification_failed_via_shared_client",
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
except Exception as e:
logger.error(
"notification_error_via_shared_client",
error=str(e),
event_id=str(event.id),
tenant_id=str(event.tenant_id)
)
# Don't re-raise - we don't want to fail the entire event processing
# if notification sending fails
async def stop(self):
"""Stop consumer and close connections"""
try:
if self.channel:
await self.channel.close()
logger.info("rabbitmq_channel_closed")
if self.connection:
await self.connection.close()
logger.info("rabbitmq_connection_closed")
except Exception as e:
logger.error("consumer_stop_failed", error=str(e))