360 lines
14 KiB
Python
360 lines
14 KiB
Python
# services/alert_processor/app/main.py
|
|
"""
|
|
Alert Processor Service - Central hub for processing alerts and recommendations
|
|
Consumes from RabbitMQ, stores in database, and routes to notification service
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
import structlog
|
|
import redis.asyncio as aioredis
|
|
from aio_pika import connect_robust, IncomingMessage, ExchangeType
|
|
|
|
from app.config import AlertProcessorConfig
|
|
from shared.database.base import create_database_manager
|
|
from shared.clients.base_service_client import BaseServiceClient
|
|
from shared.config.rabbitmq_config import RABBITMQ_CONFIG
|
|
|
|
# Setup logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="ISO"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class NotificationServiceClient(BaseServiceClient):
|
|
"""Client for notification service"""
|
|
|
|
def __init__(self, config: AlertProcessorConfig):
|
|
super().__init__("notification-service", config)
|
|
self.config = config
|
|
|
|
def get_service_base_path(self) -> str:
|
|
"""Return the base path for notification service APIs"""
|
|
return "/api/v1"
|
|
|
|
async def send_notification(self, tenant_id: str, notification: Dict[str, Any], channels: list) -> Dict[str, Any]:
|
|
"""Send notification via notification service"""
|
|
try:
|
|
response = await self.post(
|
|
"/api/v1/notifications/send",
|
|
json={
|
|
"tenant_id": tenant_id,
|
|
"notification": notification,
|
|
"channels": channels
|
|
}
|
|
)
|
|
return response
|
|
except Exception as e:
|
|
logger.error("Failed to send notification", error=str(e), tenant_id=tenant_id)
|
|
return {"status": "failed", "error": str(e)}
|
|
|
|
class AlertProcessorService:
|
|
"""
|
|
Central service for processing and routing alerts and recommendations
|
|
Integrates with notification service for multi-channel delivery
|
|
"""
|
|
|
|
def __init__(self, config: AlertProcessorConfig):
|
|
self.config = config
|
|
self.db_manager = create_database_manager(config.DATABASE_URL, "alert-processor")
|
|
self.notification_client = NotificationServiceClient(config)
|
|
self.redis = None
|
|
self.connection = None
|
|
self.channel = None
|
|
self.running = False
|
|
|
|
# Metrics
|
|
self.items_processed = 0
|
|
self.items_stored = 0
|
|
self.notifications_sent = 0
|
|
self.errors_count = 0
|
|
|
|
async def start(self):
|
|
"""Start the alert processor service"""
|
|
try:
|
|
logger.info("Starting Alert Processor Service")
|
|
|
|
# Connect to Redis for SSE publishing
|
|
self.redis = aioredis.from_url(self.config.REDIS_URL)
|
|
logger.info("Connected to Redis")
|
|
|
|
# Connect to RabbitMQ
|
|
await self._setup_rabbitmq()
|
|
|
|
# Start consuming messages
|
|
await self._start_consuming()
|
|
|
|
self.running = True
|
|
logger.info("Alert Processor Service started successfully")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to start Alert Processor Service", error=str(e))
|
|
raise
|
|
|
|
async def _setup_rabbitmq(self):
|
|
"""Setup RabbitMQ connection and configuration"""
|
|
self.connection = await connect_robust(
|
|
self.config.RABBITMQ_URL,
|
|
heartbeat=30,
|
|
connection_attempts=5
|
|
)
|
|
self.channel = await self.connection.channel()
|
|
await self.channel.set_qos(prefetch_count=10) # Process 10 messages at a time
|
|
|
|
# Setup exchange and queue based on config
|
|
exchange_config = RABBITMQ_CONFIG["exchanges"]["alerts"]
|
|
self.exchange = await self.channel.declare_exchange(
|
|
exchange_config["name"],
|
|
getattr(ExchangeType, exchange_config["type"].upper()),
|
|
durable=exchange_config["durable"]
|
|
)
|
|
|
|
queue_config = RABBITMQ_CONFIG["queues"]["alert_processing"]
|
|
self.queue = await self.channel.declare_queue(
|
|
queue_config["name"],
|
|
durable=queue_config["durable"],
|
|
arguments=queue_config["arguments"]
|
|
)
|
|
|
|
# Bind to all alert and recommendation routing keys
|
|
await self.queue.bind(self.exchange, routing_key="*.*.*")
|
|
|
|
logger.info("RabbitMQ setup completed")
|
|
|
|
async def _start_consuming(self):
|
|
"""Start consuming messages from RabbitMQ"""
|
|
await self.queue.consume(self.process_item)
|
|
logger.info("Started consuming alert messages")
|
|
|
|
async def process_item(self, message: IncomingMessage):
|
|
"""Process incoming alert or recommendation"""
|
|
async with message.process():
|
|
try:
|
|
# Parse message
|
|
item = json.loads(message.body.decode())
|
|
|
|
logger.info("Processing item",
|
|
item_type=item.get('item_type'),
|
|
alert_type=item.get('type'),
|
|
severity=item.get('severity'),
|
|
tenant_id=item.get('tenant_id'))
|
|
|
|
# Store in database
|
|
stored_item = await self.store_item(item)
|
|
self.items_stored += 1
|
|
|
|
# Determine delivery channels based on severity and type
|
|
channels = self.get_channels_by_severity_and_type(
|
|
item['severity'],
|
|
item['item_type']
|
|
)
|
|
|
|
# Send via notification service if channels are specified
|
|
if channels:
|
|
notification_result = await self.notification_client.send_notification(
|
|
tenant_id=item['tenant_id'],
|
|
notification={
|
|
'type': item['item_type'], # 'alert' or 'recommendation'
|
|
'id': item['id'],
|
|
'title': item['title'],
|
|
'message': item['message'],
|
|
'severity': item['severity'],
|
|
'metadata': item.get('metadata', {}),
|
|
'actions': item.get('actions', []),
|
|
'email': item.get('email'),
|
|
'phone': item.get('phone'),
|
|
'user_id': item.get('user_id')
|
|
},
|
|
channels=channels
|
|
)
|
|
|
|
if notification_result.get('status') == 'success':
|
|
self.notifications_sent += 1
|
|
|
|
# Stream to SSE for real-time dashboard (always)
|
|
await self.stream_to_sse(item['tenant_id'], stored_item)
|
|
|
|
self.items_processed += 1
|
|
|
|
logger.info("Item processed successfully",
|
|
item_id=item['id'],
|
|
channels=len(channels))
|
|
|
|
except Exception as e:
|
|
self.errors_count += 1
|
|
logger.error("Item processing failed", error=str(e))
|
|
raise
|
|
|
|
async def store_item(self, item: dict) -> dict:
|
|
"""Store alert or recommendation in database"""
|
|
from sqlalchemy import text
|
|
|
|
query = text("""
|
|
INSERT INTO alerts (
|
|
id, tenant_id, item_type, alert_type, severity, status,
|
|
service, title, message, actions, metadata,
|
|
created_at
|
|
) VALUES (:id, :tenant_id, :item_type, :alert_type, :severity, :status,
|
|
:service, :title, :message, :actions, :metadata, :created_at)
|
|
RETURNING *
|
|
""")
|
|
|
|
async with self.db_manager.get_session() as session:
|
|
result = await session.execute(
|
|
query,
|
|
{
|
|
'id': item['id'],
|
|
'tenant_id': item['tenant_id'],
|
|
'item_type': item['item_type'], # 'alert' or 'recommendation'
|
|
'alert_type': item['type'],
|
|
'severity': item['severity'],
|
|
'status': 'active',
|
|
'service': item['service'],
|
|
'title': item['title'],
|
|
'message': item['message'],
|
|
'actions': json.dumps(item.get('actions', [])),
|
|
'metadata': json.dumps(item.get('metadata', {})),
|
|
'created_at': item['timestamp']
|
|
}
|
|
)
|
|
|
|
row = result.fetchone()
|
|
await session.commit()
|
|
|
|
logger.debug("Item stored in database", item_id=item['id'])
|
|
return dict(row._mapping)
|
|
|
|
async def stream_to_sse(self, tenant_id: str, item: dict):
|
|
"""Publish item to Redis for SSE streaming"""
|
|
channel = f"alerts:{tenant_id}"
|
|
|
|
# Prepare message for SSE
|
|
sse_message = {
|
|
'id': item['id'],
|
|
'item_type': item['item_type'],
|
|
'type': item['alert_type'],
|
|
'severity': item['severity'],
|
|
'title': item['title'],
|
|
'message': item['message'],
|
|
'actions': json.loads(item['actions']) if isinstance(item['actions'], str) else item['actions'],
|
|
'metadata': json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata'],
|
|
'timestamp': item['created_at'].isoformat() if hasattr(item['created_at'], 'isoformat') else item['created_at'],
|
|
'status': item['status']
|
|
}
|
|
|
|
# Publish to Redis channel for SSE
|
|
await self.redis.publish(channel, json.dumps(sse_message))
|
|
|
|
logger.debug("Item published to SSE", tenant_id=tenant_id, item_id=item['id'])
|
|
|
|
def get_channels_by_severity_and_type(self, severity: str, item_type: str) -> list:
|
|
"""Determine notification channels based on severity, type, and time"""
|
|
current_hour = datetime.now().hour
|
|
|
|
channels = ['dashboard'] # Always include dashboard (SSE)
|
|
|
|
if item_type == 'alert':
|
|
if severity == 'urgent':
|
|
# Urgent alerts: All channels immediately
|
|
channels.extend(['whatsapp', 'email', 'push'])
|
|
elif severity == 'high':
|
|
# High alerts: WhatsApp and email during extended hours
|
|
if 6 <= current_hour <= 22:
|
|
channels.extend(['whatsapp', 'email'])
|
|
else:
|
|
channels.append('email') # Email only during night
|
|
elif severity == 'medium':
|
|
# Medium alerts: Email during business hours
|
|
if 7 <= current_hour <= 20:
|
|
channels.append('email')
|
|
# Low severity: Dashboard only
|
|
|
|
elif item_type == 'recommendation':
|
|
# Recommendations: Less urgent, limit channels and respect business hours
|
|
if severity in ['medium', 'high']:
|
|
if 8 <= current_hour <= 19: # Business hours for recommendations
|
|
channels.append('email')
|
|
# Low/urgent (rare for recs): Dashboard only
|
|
|
|
return channels
|
|
|
|
async def stop(self):
|
|
"""Stop the alert processor service"""
|
|
self.running = False
|
|
logger.info("Stopping Alert Processor Service")
|
|
|
|
try:
|
|
# Close RabbitMQ connection
|
|
if self.connection and not self.connection.is_closed:
|
|
await self.connection.close()
|
|
|
|
# Close Redis connection
|
|
if self.redis:
|
|
await self.redis.close()
|
|
|
|
logger.info("Alert Processor Service stopped")
|
|
|
|
except Exception as e:
|
|
logger.error("Error stopping service", error=str(e))
|
|
|
|
def get_metrics(self) -> Dict[str, Any]:
|
|
"""Get service metrics"""
|
|
return {
|
|
"items_processed": self.items_processed,
|
|
"items_stored": self.items_stored,
|
|
"notifications_sent": self.notifications_sent,
|
|
"errors_count": self.errors_count,
|
|
"running": self.running
|
|
}
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
config = AlertProcessorConfig()
|
|
service = AlertProcessorService(config)
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
async def shutdown():
|
|
logger.info("Received shutdown signal")
|
|
await service.stop()
|
|
sys.exit(0)
|
|
|
|
# Register signal handlers
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
signal.signal(sig, lambda s, f: asyncio.create_task(shutdown()))
|
|
|
|
try:
|
|
# Start the service
|
|
await service.start()
|
|
|
|
# Keep running
|
|
while service.running:
|
|
await asyncio.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Received keyboard interrupt")
|
|
except Exception as e:
|
|
logger.error("Service failed", error=str(e))
|
|
finally:
|
|
await service.stop()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |