2025-08-23 10:19:58 +02:00
|
|
|
# services/alert_processor/app/main.py
|
|
|
|
|
"""
|
|
|
|
|
Alert Processor Service - Central hub for processing alerts and recommendations
|
|
|
|
|
Consumes from RabbitMQ, stores in database, and routes to notification service
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
|
|
|
|
import signal
|
|
|
|
|
import sys
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from typing import Dict, Any
|
|
|
|
|
import structlog
|
2025-10-15 16:12:49 +02:00
|
|
|
from shared.redis_utils import initialize_redis, close_redis, get_redis_client
|
2025-08-23 10:19:58 +02:00
|
|
|
from aio_pika import connect_robust, IncomingMessage, ExchangeType
|
|
|
|
|
|
|
|
|
|
from app.config import AlertProcessorConfig
|
|
|
|
|
from shared.database.base import create_database_manager
|
|
|
|
|
from shared.clients.base_service_client import BaseServiceClient
|
|
|
|
|
from shared.config.rabbitmq_config import RABBITMQ_CONFIG
|
|
|
|
|
|
|
|
|
|
# Setup logging
|
|
|
|
|
structlog.configure(
|
|
|
|
|
processors=[
|
|
|
|
|
structlog.stdlib.filter_by_level,
|
|
|
|
|
structlog.stdlib.add_logger_name,
|
|
|
|
|
structlog.stdlib.add_log_level,
|
|
|
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
|
|
|
structlog.processors.TimeStamper(fmt="ISO"),
|
|
|
|
|
structlog.processors.StackInfoRenderer(),
|
|
|
|
|
structlog.processors.format_exc_info,
|
|
|
|
|
structlog.processors.JSONRenderer()
|
|
|
|
|
],
|
|
|
|
|
context_class=dict,
|
|
|
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
|
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
|
|
|
cache_logger_on_first_use=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NotificationServiceClient(BaseServiceClient):
|
|
|
|
|
"""Client for notification service"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config: AlertProcessorConfig):
|
|
|
|
|
super().__init__("notification-service", config)
|
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
|
|
def get_service_base_path(self) -> str:
|
|
|
|
|
"""Return the base path for notification service APIs"""
|
|
|
|
|
return "/api/v1"
|
|
|
|
|
|
|
|
|
|
async def send_notification(self, tenant_id: str, notification: Dict[str, Any], channels: list) -> Dict[str, Any]:
|
|
|
|
|
"""Send notification via notification service"""
|
|
|
|
|
try:
|
|
|
|
|
response = await self.post(
|
|
|
|
|
"/api/v1/notifications/send",
|
|
|
|
|
json={
|
|
|
|
|
"tenant_id": tenant_id,
|
|
|
|
|
"notification": notification,
|
|
|
|
|
"channels": channels
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
return response
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Failed to send notification", error=str(e), tenant_id=tenant_id)
|
|
|
|
|
return {"status": "failed", "error": str(e)}
|
|
|
|
|
|
|
|
|
|
class AlertProcessorService:
|
|
|
|
|
"""
|
|
|
|
|
Central service for processing and routing alerts and recommendations
|
|
|
|
|
Integrates with notification service for multi-channel delivery
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config: AlertProcessorConfig):
|
|
|
|
|
self.config = config
|
|
|
|
|
self.db_manager = create_database_manager(config.DATABASE_URL, "alert-processor")
|
|
|
|
|
self.notification_client = NotificationServiceClient(config)
|
|
|
|
|
self.redis = None
|
|
|
|
|
self.connection = None
|
|
|
|
|
self.channel = None
|
|
|
|
|
self.running = False
|
|
|
|
|
|
|
|
|
|
# Metrics
|
|
|
|
|
self.items_processed = 0
|
|
|
|
|
self.items_stored = 0
|
|
|
|
|
self.notifications_sent = 0
|
|
|
|
|
self.errors_count = 0
|
|
|
|
|
|
|
|
|
|
async def start(self):
|
|
|
|
|
"""Start the alert processor service"""
|
|
|
|
|
try:
|
|
|
|
|
logger.info("Starting Alert Processor Service")
|
2025-10-15 16:12:49 +02:00
|
|
|
|
|
|
|
|
# Initialize shared Redis connection for SSE publishing
|
|
|
|
|
await initialize_redis(self.config.REDIS_URL, db=0, max_connections=20)
|
|
|
|
|
self.redis = await get_redis_client()
|
2025-08-23 10:19:58 +02:00
|
|
|
logger.info("Connected to Redis")
|
|
|
|
|
|
|
|
|
|
# Connect to RabbitMQ
|
|
|
|
|
await self._setup_rabbitmq()
|
|
|
|
|
|
|
|
|
|
# Start consuming messages
|
|
|
|
|
await self._start_consuming()
|
|
|
|
|
|
|
|
|
|
self.running = True
|
|
|
|
|
logger.info("Alert Processor Service started successfully")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Failed to start Alert Processor Service", error=str(e))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
async def _setup_rabbitmq(self):
|
|
|
|
|
"""Setup RabbitMQ connection and configuration"""
|
|
|
|
|
self.connection = await connect_robust(
|
|
|
|
|
self.config.RABBITMQ_URL,
|
|
|
|
|
heartbeat=30,
|
|
|
|
|
connection_attempts=5
|
|
|
|
|
)
|
|
|
|
|
self.channel = await self.connection.channel()
|
|
|
|
|
await self.channel.set_qos(prefetch_count=10) # Process 10 messages at a time
|
|
|
|
|
|
|
|
|
|
# Setup exchange and queue based on config
|
|
|
|
|
exchange_config = RABBITMQ_CONFIG["exchanges"]["alerts"]
|
|
|
|
|
self.exchange = await self.channel.declare_exchange(
|
|
|
|
|
exchange_config["name"],
|
|
|
|
|
getattr(ExchangeType, exchange_config["type"].upper()),
|
|
|
|
|
durable=exchange_config["durable"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
queue_config = RABBITMQ_CONFIG["queues"]["alert_processing"]
|
|
|
|
|
self.queue = await self.channel.declare_queue(
|
|
|
|
|
queue_config["name"],
|
|
|
|
|
durable=queue_config["durable"],
|
|
|
|
|
arguments=queue_config["arguments"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Bind to all alert and recommendation routing keys
|
|
|
|
|
await self.queue.bind(self.exchange, routing_key="*.*.*")
|
|
|
|
|
|
|
|
|
|
logger.info("RabbitMQ setup completed")
|
|
|
|
|
|
|
|
|
|
async def _start_consuming(self):
|
|
|
|
|
"""Start consuming messages from RabbitMQ"""
|
|
|
|
|
await self.queue.consume(self.process_item)
|
|
|
|
|
logger.info("Started consuming alert messages")
|
|
|
|
|
|
|
|
|
|
async def process_item(self, message: IncomingMessage):
|
|
|
|
|
"""Process incoming alert or recommendation"""
|
|
|
|
|
async with message.process():
|
|
|
|
|
try:
|
|
|
|
|
# Parse message
|
|
|
|
|
item = json.loads(message.body.decode())
|
|
|
|
|
|
|
|
|
|
logger.info("Processing item",
|
|
|
|
|
item_type=item.get('item_type'),
|
|
|
|
|
alert_type=item.get('type'),
|
|
|
|
|
severity=item.get('severity'),
|
|
|
|
|
tenant_id=item.get('tenant_id'))
|
|
|
|
|
|
|
|
|
|
# Store in database
|
|
|
|
|
stored_item = await self.store_item(item)
|
|
|
|
|
self.items_stored += 1
|
|
|
|
|
|
|
|
|
|
# Determine delivery channels based on severity and type
|
|
|
|
|
channels = self.get_channels_by_severity_and_type(
|
|
|
|
|
item['severity'],
|
|
|
|
|
item['item_type']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Send via notification service if channels are specified
|
|
|
|
|
if channels:
|
|
|
|
|
notification_result = await self.notification_client.send_notification(
|
|
|
|
|
tenant_id=item['tenant_id'],
|
|
|
|
|
notification={
|
|
|
|
|
'type': item['item_type'], # 'alert' or 'recommendation'
|
|
|
|
|
'id': item['id'],
|
|
|
|
|
'title': item['title'],
|
|
|
|
|
'message': item['message'],
|
|
|
|
|
'severity': item['severity'],
|
|
|
|
|
'metadata': item.get('metadata', {}),
|
|
|
|
|
'actions': item.get('actions', []),
|
|
|
|
|
'email': item.get('email'),
|
|
|
|
|
'phone': item.get('phone'),
|
|
|
|
|
'user_id': item.get('user_id')
|
|
|
|
|
},
|
|
|
|
|
channels=channels
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if notification_result.get('status') == 'success':
|
|
|
|
|
self.notifications_sent += 1
|
|
|
|
|
|
|
|
|
|
# Stream to SSE for real-time dashboard (always)
|
|
|
|
|
await self.stream_to_sse(item['tenant_id'], stored_item)
|
|
|
|
|
|
|
|
|
|
self.items_processed += 1
|
|
|
|
|
|
|
|
|
|
logger.info("Item processed successfully",
|
|
|
|
|
item_id=item['id'],
|
|
|
|
|
channels=len(channels))
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.errors_count += 1
|
|
|
|
|
logger.error("Item processing failed", error=str(e))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
async def store_item(self, item: dict) -> dict:
|
2025-10-19 19:22:37 +02:00
|
|
|
"""Store alert or recommendation in database and cache in Redis"""
|
2025-09-18 23:32:53 +02:00
|
|
|
from app.models.alerts import Alert, AlertSeverity, AlertStatus
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
2025-08-23 10:19:58 +02:00
|
|
|
async with self.db_manager.get_session() as session:
|
2025-09-18 23:32:53 +02:00
|
|
|
# Create alert instance
|
|
|
|
|
alert = Alert(
|
|
|
|
|
id=item['id'],
|
|
|
|
|
tenant_id=item['tenant_id'],
|
|
|
|
|
item_type=item['item_type'], # 'alert' or 'recommendation'
|
|
|
|
|
alert_type=item['type'],
|
2025-09-21 17:35:36 +02:00
|
|
|
severity=AlertSeverity(item['severity'].lower()),
|
2025-09-18 23:32:53 +02:00
|
|
|
status=AlertStatus.ACTIVE,
|
|
|
|
|
service=item['service'],
|
|
|
|
|
title=item['title'],
|
|
|
|
|
message=item['message'],
|
|
|
|
|
actions=item.get('actions', []),
|
|
|
|
|
alert_metadata=item.get('metadata', {}),
|
|
|
|
|
created_at=datetime.fromisoformat(item['timestamp']) if isinstance(item['timestamp'], str) else item['timestamp']
|
2025-08-23 10:19:58 +02:00
|
|
|
)
|
2025-09-18 23:32:53 +02:00
|
|
|
|
|
|
|
|
session.add(alert)
|
2025-08-23 10:19:58 +02:00
|
|
|
await session.commit()
|
2025-09-18 23:32:53 +02:00
|
|
|
await session.refresh(alert)
|
|
|
|
|
|
2025-08-23 10:19:58 +02:00
|
|
|
logger.debug("Item stored in database", item_id=item['id'])
|
2025-09-18 23:32:53 +02:00
|
|
|
|
|
|
|
|
# Convert to dict for return
|
2025-10-19 19:22:37 +02:00
|
|
|
alert_dict = {
|
2025-09-18 23:32:53 +02:00
|
|
|
'id': str(alert.id),
|
|
|
|
|
'tenant_id': str(alert.tenant_id),
|
|
|
|
|
'item_type': alert.item_type,
|
|
|
|
|
'alert_type': alert.alert_type,
|
|
|
|
|
'severity': alert.severity.value,
|
|
|
|
|
'status': alert.status.value,
|
|
|
|
|
'service': alert.service,
|
|
|
|
|
'title': alert.title,
|
|
|
|
|
'message': alert.message,
|
|
|
|
|
'actions': alert.actions,
|
|
|
|
|
'metadata': alert.alert_metadata,
|
|
|
|
|
'created_at': alert.created_at
|
|
|
|
|
}
|
2025-10-19 19:22:37 +02:00
|
|
|
|
|
|
|
|
# Cache active alerts in Redis for SSE initial_items
|
|
|
|
|
await self._cache_active_alerts(str(alert.tenant_id))
|
|
|
|
|
|
|
|
|
|
return alert_dict
|
|
|
|
|
|
|
|
|
|
async def _cache_active_alerts(self, tenant_id: str):
|
|
|
|
|
"""Cache all active alerts for a tenant in Redis for quick SSE access"""
|
|
|
|
|
try:
|
|
|
|
|
from app.models.alerts import Alert, AlertStatus
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
|
|
|
|
async with self.db_manager.get_session() as session:
|
|
|
|
|
# Query all active alerts for this tenant
|
|
|
|
|
query = select(Alert).where(
|
|
|
|
|
Alert.tenant_id == tenant_id,
|
|
|
|
|
Alert.status == AlertStatus.ACTIVE
|
|
|
|
|
).order_by(Alert.created_at.desc()).limit(50)
|
|
|
|
|
|
|
|
|
|
result = await session.execute(query)
|
|
|
|
|
alerts = result.scalars().all()
|
|
|
|
|
|
|
|
|
|
# Convert to JSON-serializable format
|
|
|
|
|
active_items = []
|
|
|
|
|
for alert in alerts:
|
|
|
|
|
active_items.append({
|
|
|
|
|
'id': str(alert.id),
|
|
|
|
|
'item_type': alert.item_type,
|
|
|
|
|
'type': alert.alert_type,
|
|
|
|
|
'severity': alert.severity.value,
|
|
|
|
|
'title': alert.title,
|
|
|
|
|
'message': alert.message,
|
|
|
|
|
'actions': alert.actions or [],
|
|
|
|
|
'metadata': alert.alert_metadata or {},
|
|
|
|
|
'timestamp': alert.created_at.isoformat() if alert.created_at else datetime.utcnow().isoformat(),
|
|
|
|
|
'status': alert.status.value
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# Cache in Redis with 1 hour TTL
|
|
|
|
|
cache_key = f"active_alerts:{tenant_id}"
|
|
|
|
|
await self.redis.setex(
|
|
|
|
|
cache_key,
|
|
|
|
|
3600, # 1 hour TTL
|
|
|
|
|
json.dumps(active_items)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.debug("Cached active alerts in Redis",
|
|
|
|
|
tenant_id=tenant_id,
|
|
|
|
|
count=len(active_items))
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Failed to cache active alerts",
|
|
|
|
|
tenant_id=tenant_id,
|
|
|
|
|
error=str(e))
|
2025-08-23 10:19:58 +02:00
|
|
|
|
|
|
|
|
async def stream_to_sse(self, tenant_id: str, item: dict):
|
|
|
|
|
"""Publish item to Redis for SSE streaming"""
|
|
|
|
|
channel = f"alerts:{tenant_id}"
|
|
|
|
|
|
|
|
|
|
# Prepare message for SSE
|
|
|
|
|
sse_message = {
|
|
|
|
|
'id': item['id'],
|
|
|
|
|
'item_type': item['item_type'],
|
|
|
|
|
'type': item['alert_type'],
|
|
|
|
|
'severity': item['severity'],
|
|
|
|
|
'title': item['title'],
|
|
|
|
|
'message': item['message'],
|
|
|
|
|
'actions': json.loads(item['actions']) if isinstance(item['actions'], str) else item['actions'],
|
|
|
|
|
'metadata': json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata'],
|
|
|
|
|
'timestamp': item['created_at'].isoformat() if hasattr(item['created_at'], 'isoformat') else item['created_at'],
|
|
|
|
|
'status': item['status']
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Publish to Redis channel for SSE
|
|
|
|
|
await self.redis.publish(channel, json.dumps(sse_message))
|
|
|
|
|
|
|
|
|
|
logger.debug("Item published to SSE", tenant_id=tenant_id, item_id=item['id'])
|
|
|
|
|
|
|
|
|
|
def get_channels_by_severity_and_type(self, severity: str, item_type: str) -> list:
|
|
|
|
|
"""Determine notification channels based on severity, type, and time"""
|
|
|
|
|
current_hour = datetime.now().hour
|
|
|
|
|
|
|
|
|
|
channels = ['dashboard'] # Always include dashboard (SSE)
|
|
|
|
|
|
|
|
|
|
if item_type == 'alert':
|
|
|
|
|
if severity == 'urgent':
|
|
|
|
|
# Urgent alerts: All channels immediately
|
|
|
|
|
channels.extend(['whatsapp', 'email', 'push'])
|
|
|
|
|
elif severity == 'high':
|
|
|
|
|
# High alerts: WhatsApp and email during extended hours
|
|
|
|
|
if 6 <= current_hour <= 22:
|
|
|
|
|
channels.extend(['whatsapp', 'email'])
|
|
|
|
|
else:
|
|
|
|
|
channels.append('email') # Email only during night
|
|
|
|
|
elif severity == 'medium':
|
|
|
|
|
# Medium alerts: Email during business hours
|
|
|
|
|
if 7 <= current_hour <= 20:
|
|
|
|
|
channels.append('email')
|
|
|
|
|
# Low severity: Dashboard only
|
|
|
|
|
|
|
|
|
|
elif item_type == 'recommendation':
|
|
|
|
|
# Recommendations: Less urgent, limit channels and respect business hours
|
|
|
|
|
if severity in ['medium', 'high']:
|
|
|
|
|
if 8 <= current_hour <= 19: # Business hours for recommendations
|
|
|
|
|
channels.append('email')
|
|
|
|
|
# Low/urgent (rare for recs): Dashboard only
|
|
|
|
|
|
|
|
|
|
return channels
|
|
|
|
|
|
|
|
|
|
async def stop(self):
|
|
|
|
|
"""Stop the alert processor service"""
|
|
|
|
|
self.running = False
|
|
|
|
|
logger.info("Stopping Alert Processor Service")
|
2025-10-15 16:12:49 +02:00
|
|
|
|
2025-08-23 10:19:58 +02:00
|
|
|
try:
|
|
|
|
|
# Close RabbitMQ connection
|
|
|
|
|
if self.connection and not self.connection.is_closed:
|
|
|
|
|
await self.connection.close()
|
2025-10-15 16:12:49 +02:00
|
|
|
|
|
|
|
|
# Close shared Redis connection
|
|
|
|
|
await close_redis()
|
|
|
|
|
|
2025-08-23 10:19:58 +02:00
|
|
|
logger.info("Alert Processor Service stopped")
|
2025-10-15 16:12:49 +02:00
|
|
|
|
2025-08-23 10:19:58 +02:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Error stopping service", error=str(e))
|
|
|
|
|
|
|
|
|
|
def get_metrics(self) -> Dict[str, Any]:
|
|
|
|
|
"""Get service metrics"""
|
|
|
|
|
return {
|
|
|
|
|
"items_processed": self.items_processed,
|
|
|
|
|
"items_stored": self.items_stored,
|
|
|
|
|
"notifications_sent": self.notifications_sent,
|
|
|
|
|
"errors_count": self.errors_count,
|
|
|
|
|
"running": self.running
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
"""Main entry point"""
|
|
|
|
|
config = AlertProcessorConfig()
|
|
|
|
|
service = AlertProcessorService(config)
|
|
|
|
|
|
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
|
|
|
async def shutdown():
|
|
|
|
|
logger.info("Received shutdown signal")
|
|
|
|
|
await service.stop()
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
# Register signal handlers
|
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
|
|
|
signal.signal(sig, lambda s, f: asyncio.create_task(shutdown()))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Start the service
|
|
|
|
|
await service.start()
|
|
|
|
|
|
|
|
|
|
# Keep running
|
|
|
|
|
while service.running:
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
logger.info("Received keyboard interrupt")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Service failed", error=str(e))
|
|
|
|
|
finally:
|
|
|
|
|
await service.stop()
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
asyncio.run(main())
|