2025-12-05 20:07:01 +01:00
|
|
|
"""
|
|
|
|
|
RabbitMQ event consumer.
|
|
|
|
|
|
|
|
|
|
Consumes minimal events from services and processes them through
|
|
|
|
|
the enrichment pipeline.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
2025-12-13 23:57:54 +01:00
|
|
|
from datetime import datetime, timezone
|
2025-12-05 20:07:01 +01:00
|
|
|
from aio_pika import connect_robust, IncomingMessage, Connection, Channel
|
|
|
|
|
import structlog
|
|
|
|
|
|
|
|
|
|
from app.core.config import settings
|
|
|
|
|
from app.core.database import AsyncSessionLocal
|
2025-12-09 10:21:41 +01:00
|
|
|
from shared.messaging import MinimalEvent
|
2025-12-05 20:07:01 +01:00
|
|
|
from app.services.enrichment_orchestrator import EnrichmentOrchestrator
|
|
|
|
|
from app.repositories.event_repository import EventRepository
|
|
|
|
|
from shared.clients.notification_client import create_notification_client
|
|
|
|
|
from app.services.sse_service import SSEService
|
|
|
|
|
|
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventConsumer:
|
|
|
|
|
"""
|
|
|
|
|
RabbitMQ consumer for processing events.
|
|
|
|
|
|
|
|
|
|
Workflow:
|
|
|
|
|
1. Receive minimal event from service
|
|
|
|
|
2. Enrich with context (AI, priority, impact, etc.)
|
|
|
|
|
3. Store in database
|
|
|
|
|
4. Send to notification service
|
|
|
|
|
5. Publish to SSE stream
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.connection: Connection = None
|
|
|
|
|
self.channel: Channel = None
|
|
|
|
|
self.enricher = EnrichmentOrchestrator()
|
|
|
|
|
self.notification_client = create_notification_client(settings)
|
|
|
|
|
self.sse_svc = SSEService()
|
|
|
|
|
|
|
|
|
|
async def start(self):
|
|
|
|
|
"""Start consuming events from RabbitMQ"""
|
|
|
|
|
try:
|
|
|
|
|
# Connect to RabbitMQ
|
|
|
|
|
self.connection = await connect_robust(
|
|
|
|
|
settings.RABBITMQ_URL,
|
|
|
|
|
client_properties={"connection_name": "alert-processor"}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.channel = await self.connection.channel()
|
|
|
|
|
await self.channel.set_qos(prefetch_count=10)
|
|
|
|
|
|
|
|
|
|
# Declare queue
|
|
|
|
|
queue = await self.channel.declare_queue(
|
|
|
|
|
settings.RABBITMQ_QUEUE,
|
|
|
|
|
durable=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Bind to events exchange with routing patterns
|
|
|
|
|
exchange = await self.channel.declare_exchange(
|
|
|
|
|
settings.RABBITMQ_EXCHANGE,
|
|
|
|
|
"topic",
|
|
|
|
|
durable=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Bind to alert, notification, and recommendation events
|
|
|
|
|
await queue.bind(exchange, routing_key="alert.#")
|
|
|
|
|
await queue.bind(exchange, routing_key="notification.#")
|
|
|
|
|
await queue.bind(exchange, routing_key="recommendation.#")
|
|
|
|
|
|
|
|
|
|
# Start consuming
|
|
|
|
|
await queue.consume(self.process_message)
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"event_consumer_started",
|
|
|
|
|
queue=settings.RABBITMQ_QUEUE,
|
|
|
|
|
exchange=settings.RABBITMQ_EXCHANGE
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("consumer_start_failed", error=str(e))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
async def process_message(self, message: IncomingMessage):
|
|
|
|
|
"""
|
|
|
|
|
Process incoming event message.
|
|
|
|
|
|
|
|
|
|
Steps:
|
|
|
|
|
1. Parse message
|
|
|
|
|
2. Validate as MinimalEvent
|
|
|
|
|
3. Enrich event
|
|
|
|
|
4. Store in database
|
|
|
|
|
5. Send notification
|
|
|
|
|
6. Publish to SSE
|
|
|
|
|
7. Acknowledge message
|
|
|
|
|
"""
|
|
|
|
|
async with message.process():
|
|
|
|
|
try:
|
|
|
|
|
# Parse message
|
|
|
|
|
data = json.loads(message.body.decode())
|
|
|
|
|
event = MinimalEvent(**data)
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"event_received",
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
event_class=event.event_class,
|
|
|
|
|
tenant_id=event.tenant_id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Enrich the event
|
|
|
|
|
enriched_event = await self.enricher.enrich_event(event)
|
|
|
|
|
|
2025-12-13 23:57:54 +01:00
|
|
|
# Check for duplicate alerts before storing
|
2025-12-05 20:07:01 +01:00
|
|
|
async with AsyncSessionLocal() as session:
|
|
|
|
|
repo = EventRepository(session)
|
2025-12-13 23:57:54 +01:00
|
|
|
|
|
|
|
|
# Check for duplicate if it's an alert
|
|
|
|
|
if event.event_class == "alert":
|
|
|
|
|
from uuid import UUID
|
|
|
|
|
duplicate_event = await repo.check_duplicate_alert(
|
|
|
|
|
tenant_id=UUID(event.tenant_id),
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
entity_links=enriched_event.entity_links,
|
|
|
|
|
event_metadata=enriched_event.event_metadata,
|
|
|
|
|
time_window_hours=24 # Check for duplicates in last 24 hours
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if duplicate_event:
|
|
|
|
|
logger.info(
|
|
|
|
|
"Duplicate alert detected, skipping",
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
tenant_id=event.tenant_id,
|
|
|
|
|
duplicate_event_id=str(duplicate_event.id)
|
|
|
|
|
)
|
|
|
|
|
# Update the existing event's metadata instead of creating a new one
|
|
|
|
|
# This could include updating delay times, affected orders, etc.
|
|
|
|
|
duplicate_event.event_metadata = enriched_event.event_metadata
|
|
|
|
|
duplicate_event.updated_at = datetime.now(timezone.utc)
|
|
|
|
|
duplicate_event.priority_score = enriched_event.priority_score
|
|
|
|
|
duplicate_event.priority_level = enriched_event.priority_level
|
|
|
|
|
|
|
|
|
|
# Update other relevant fields that might have changed
|
|
|
|
|
duplicate_event.urgency = enriched_event.urgency.dict() if enriched_event.urgency else None
|
|
|
|
|
duplicate_event.business_impact = enriched_event.business_impact.dict() if enriched_event.business_impact else None
|
|
|
|
|
|
|
|
|
|
await session.commit()
|
|
|
|
|
await session.refresh(duplicate_event)
|
|
|
|
|
|
|
|
|
|
# Send notification for updated event
|
|
|
|
|
await self._send_notification(duplicate_event)
|
|
|
|
|
|
|
|
|
|
# Publish to SSE
|
|
|
|
|
await self.sse_svc.publish_event(duplicate_event)
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"Duplicate alert updated",
|
|
|
|
|
event_id=str(duplicate_event.id),
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
priority_level=duplicate_event.priority_level,
|
|
|
|
|
priority_score=duplicate_event.priority_score
|
|
|
|
|
)
|
|
|
|
|
return # Exit early since we handled the duplicate
|
|
|
|
|
else:
|
|
|
|
|
logger.info(
|
|
|
|
|
"New unique alert, proceeding with creation",
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
tenant_id=event.tenant_id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Store in database (if not a duplicate)
|
2025-12-05 20:07:01 +01:00
|
|
|
stored_event = await repo.create_event(enriched_event)
|
|
|
|
|
|
|
|
|
|
# Send to notification service (if alert)
|
|
|
|
|
if event.event_class == "alert":
|
|
|
|
|
await self._send_notification(stored_event)
|
|
|
|
|
|
|
|
|
|
# Publish to SSE
|
|
|
|
|
await self.sse_svc.publish_event(stored_event)
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"event_processed",
|
|
|
|
|
event_id=stored_event.id,
|
|
|
|
|
event_type=event.event_type,
|
|
|
|
|
priority_level=stored_event.priority_level,
|
|
|
|
|
priority_score=stored_event.priority_score
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
"message_parse_failed",
|
|
|
|
|
error=str(e),
|
|
|
|
|
message_body=message.body[:200]
|
|
|
|
|
)
|
|
|
|
|
# Don't requeue - bad message format
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
"event_processing_failed",
|
|
|
|
|
error=str(e),
|
|
|
|
|
exc_info=True
|
|
|
|
|
)
|
|
|
|
|
# Message will be requeued automatically due to exception
|
|
|
|
|
|
|
|
|
|
async def _send_notification(self, event):
|
|
|
|
|
"""
|
|
|
|
|
Send notification using the shared notification client.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
event: The event to send as a notification
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# Prepare notification message
|
|
|
|
|
# Use i18n title and message from the event as the notification content
|
|
|
|
|
title = event.i18n_title_key if event.i18n_title_key else f"Alert: {event.event_type}"
|
|
|
|
|
message = event.i18n_message_key if event.i18n_message_key else f"New alert: {event.event_type}"
|
|
|
|
|
|
|
|
|
|
# Add parameters to make it more informative
|
|
|
|
|
if event.i18n_title_params:
|
|
|
|
|
title += f" - {event.i18n_title_params}"
|
|
|
|
|
if event.i18n_message_params:
|
|
|
|
|
message += f" - {event.i18n_message_params}"
|
|
|
|
|
|
|
|
|
|
# Prepare metadata from the event
|
|
|
|
|
metadata = {
|
|
|
|
|
"event_id": str(event.id),
|
|
|
|
|
"event_type": event.event_type,
|
|
|
|
|
"event_domain": event.event_domain,
|
|
|
|
|
"priority_score": event.priority_score,
|
|
|
|
|
"priority_level": event.priority_level,
|
|
|
|
|
"status": event.status,
|
|
|
|
|
"created_at": event.created_at.isoformat() if event.created_at else None,
|
|
|
|
|
"type_class": event.type_class,
|
|
|
|
|
"smart_actions": event.smart_actions,
|
|
|
|
|
"entity_links": event.entity_links
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Determine notification priority based on event priority
|
|
|
|
|
priority_map = {
|
|
|
|
|
"critical": "urgent",
|
|
|
|
|
"important": "high",
|
|
|
|
|
"standard": "normal",
|
|
|
|
|
"info": "low"
|
|
|
|
|
}
|
|
|
|
|
priority = priority_map.get(event.priority_level, "normal")
|
|
|
|
|
|
|
|
|
|
# Send notification using shared client
|
|
|
|
|
result = await self.notification_client.send_notification(
|
|
|
|
|
tenant_id=str(event.tenant_id),
|
|
|
|
|
notification_type="in_app", # Using in-app notification by default
|
|
|
|
|
message=message,
|
|
|
|
|
subject=title,
|
|
|
|
|
priority=priority,
|
|
|
|
|
metadata=metadata
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
logger.info(
|
|
|
|
|
"notification_sent_via_shared_client",
|
|
|
|
|
event_id=str(event.id),
|
|
|
|
|
tenant_id=str(event.tenant_id),
|
|
|
|
|
priority_level=event.priority_level
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"notification_failed_via_shared_client",
|
|
|
|
|
event_id=str(event.id),
|
|
|
|
|
tenant_id=str(event.tenant_id)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
"notification_error_via_shared_client",
|
|
|
|
|
error=str(e),
|
|
|
|
|
event_id=str(event.id),
|
|
|
|
|
tenant_id=str(event.tenant_id)
|
|
|
|
|
)
|
|
|
|
|
# Don't re-raise - we don't want to fail the entire event processing
|
|
|
|
|
# if notification sending fails
|
|
|
|
|
|
|
|
|
|
async def stop(self):
|
|
|
|
|
"""Stop consumer and close connections"""
|
|
|
|
|
try:
|
|
|
|
|
if self.channel:
|
|
|
|
|
await self.channel.close()
|
|
|
|
|
logger.info("rabbitmq_channel_closed")
|
|
|
|
|
|
|
|
|
|
if self.connection:
|
|
|
|
|
await self.connection.close()
|
|
|
|
|
logger.info("rabbitmq_connection_closed")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("consumer_stop_failed", error=str(e))
|