Files
bakery-ia/services/alert_processor/app/main.py

360 lines
14 KiB
Python
Raw Normal View History

2025-08-23 10:19:58 +02:00
# services/alert_processor/app/main.py
"""
Alert Processor Service - Central hub for processing alerts and recommendations
Consumes from RabbitMQ, stores in database, and routes to notification service
"""
import asyncio
import json
import signal
import sys
from datetime import datetime
from typing import Dict, Any
import structlog
import redis.asyncio as aioredis
from aio_pika import connect_robust, IncomingMessage, ExchangeType
from app.config import AlertProcessorConfig
from shared.database.base import create_database_manager
from shared.clients.base_service_client import BaseServiceClient
from shared.config.rabbitmq_config import RABBITMQ_CONFIG
# Setup logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
class NotificationServiceClient(BaseServiceClient):
"""Client for notification service"""
def __init__(self, config: AlertProcessorConfig):
super().__init__("notification-service", config)
self.config = config
def get_service_base_path(self) -> str:
"""Return the base path for notification service APIs"""
return "/api/v1"
async def send_notification(self, tenant_id: str, notification: Dict[str, Any], channels: list) -> Dict[str, Any]:
"""Send notification via notification service"""
try:
response = await self.post(
"/api/v1/notifications/send",
json={
"tenant_id": tenant_id,
"notification": notification,
"channels": channels
}
)
return response
except Exception as e:
logger.error("Failed to send notification", error=str(e), tenant_id=tenant_id)
return {"status": "failed", "error": str(e)}
class AlertProcessorService:
"""
Central service for processing and routing alerts and recommendations
Integrates with notification service for multi-channel delivery
"""
def __init__(self, config: AlertProcessorConfig):
self.config = config
self.db_manager = create_database_manager(config.DATABASE_URL, "alert-processor")
self.notification_client = NotificationServiceClient(config)
self.redis = None
self.connection = None
self.channel = None
self.running = False
# Metrics
self.items_processed = 0
self.items_stored = 0
self.notifications_sent = 0
self.errors_count = 0
async def start(self):
"""Start the alert processor service"""
try:
logger.info("Starting Alert Processor Service")
# Connect to Redis for SSE publishing
self.redis = aioredis.from_url(self.config.REDIS_URL)
logger.info("Connected to Redis")
# Connect to RabbitMQ
await self._setup_rabbitmq()
# Start consuming messages
await self._start_consuming()
self.running = True
logger.info("Alert Processor Service started successfully")
except Exception as e:
logger.error("Failed to start Alert Processor Service", error=str(e))
raise
async def _setup_rabbitmq(self):
"""Setup RabbitMQ connection and configuration"""
self.connection = await connect_robust(
self.config.RABBITMQ_URL,
heartbeat=30,
connection_attempts=5
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10) # Process 10 messages at a time
# Setup exchange and queue based on config
exchange_config = RABBITMQ_CONFIG["exchanges"]["alerts"]
self.exchange = await self.channel.declare_exchange(
exchange_config["name"],
getattr(ExchangeType, exchange_config["type"].upper()),
durable=exchange_config["durable"]
)
queue_config = RABBITMQ_CONFIG["queues"]["alert_processing"]
self.queue = await self.channel.declare_queue(
queue_config["name"],
durable=queue_config["durable"],
arguments=queue_config["arguments"]
)
# Bind to all alert and recommendation routing keys
await self.queue.bind(self.exchange, routing_key="*.*.*")
logger.info("RabbitMQ setup completed")
async def _start_consuming(self):
"""Start consuming messages from RabbitMQ"""
await self.queue.consume(self.process_item)
logger.info("Started consuming alert messages")
async def process_item(self, message: IncomingMessage):
"""Process incoming alert or recommendation"""
async with message.process():
try:
# Parse message
item = json.loads(message.body.decode())
logger.info("Processing item",
item_type=item.get('item_type'),
alert_type=item.get('type'),
severity=item.get('severity'),
tenant_id=item.get('tenant_id'))
# Store in database
stored_item = await self.store_item(item)
self.items_stored += 1
# Determine delivery channels based on severity and type
channels = self.get_channels_by_severity_and_type(
item['severity'],
item['item_type']
)
# Send via notification service if channels are specified
if channels:
notification_result = await self.notification_client.send_notification(
tenant_id=item['tenant_id'],
notification={
'type': item['item_type'], # 'alert' or 'recommendation'
'id': item['id'],
'title': item['title'],
'message': item['message'],
'severity': item['severity'],
'metadata': item.get('metadata', {}),
'actions': item.get('actions', []),
'email': item.get('email'),
'phone': item.get('phone'),
'user_id': item.get('user_id')
},
channels=channels
)
if notification_result.get('status') == 'success':
self.notifications_sent += 1
# Stream to SSE for real-time dashboard (always)
await self.stream_to_sse(item['tenant_id'], stored_item)
self.items_processed += 1
logger.info("Item processed successfully",
item_id=item['id'],
channels=len(channels))
except Exception as e:
self.errors_count += 1
logger.error("Item processing failed", error=str(e))
raise
async def store_item(self, item: dict) -> dict:
"""Store alert or recommendation in database"""
from sqlalchemy import text
query = text("""
INSERT INTO alerts (
id, tenant_id, item_type, alert_type, severity, status,
service, title, message, actions, metadata,
created_at
) VALUES (:id, :tenant_id, :item_type, :alert_type, :severity, :status,
:service, :title, :message, :actions, :metadata, :created_at)
RETURNING *
""")
async with self.db_manager.get_session() as session:
result = await session.execute(
query,
{
'id': item['id'],
'tenant_id': item['tenant_id'],
'item_type': item['item_type'], # 'alert' or 'recommendation'
'alert_type': item['type'],
'severity': item['severity'],
'status': 'active',
'service': item['service'],
'title': item['title'],
'message': item['message'],
'actions': json.dumps(item.get('actions', [])),
'metadata': json.dumps(item.get('metadata', {})),
'created_at': item['timestamp']
}
)
row = result.fetchone()
await session.commit()
logger.debug("Item stored in database", item_id=item['id'])
return dict(row._mapping)
async def stream_to_sse(self, tenant_id: str, item: dict):
"""Publish item to Redis for SSE streaming"""
channel = f"alerts:{tenant_id}"
# Prepare message for SSE
sse_message = {
'id': item['id'],
'item_type': item['item_type'],
'type': item['alert_type'],
'severity': item['severity'],
'title': item['title'],
'message': item['message'],
'actions': json.loads(item['actions']) if isinstance(item['actions'], str) else item['actions'],
'metadata': json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata'],
'timestamp': item['created_at'].isoformat() if hasattr(item['created_at'], 'isoformat') else item['created_at'],
'status': item['status']
}
# Publish to Redis channel for SSE
await self.redis.publish(channel, json.dumps(sse_message))
logger.debug("Item published to SSE", tenant_id=tenant_id, item_id=item['id'])
def get_channels_by_severity_and_type(self, severity: str, item_type: str) -> list:
"""Determine notification channels based on severity, type, and time"""
current_hour = datetime.now().hour
channels = ['dashboard'] # Always include dashboard (SSE)
if item_type == 'alert':
if severity == 'urgent':
# Urgent alerts: All channels immediately
channels.extend(['whatsapp', 'email', 'push'])
elif severity == 'high':
# High alerts: WhatsApp and email during extended hours
if 6 <= current_hour <= 22:
channels.extend(['whatsapp', 'email'])
else:
channels.append('email') # Email only during night
elif severity == 'medium':
# Medium alerts: Email during business hours
if 7 <= current_hour <= 20:
channels.append('email')
# Low severity: Dashboard only
elif item_type == 'recommendation':
# Recommendations: Less urgent, limit channels and respect business hours
if severity in ['medium', 'high']:
if 8 <= current_hour <= 19: # Business hours for recommendations
channels.append('email')
# Low/urgent (rare for recs): Dashboard only
return channels
async def stop(self):
"""Stop the alert processor service"""
self.running = False
logger.info("Stopping Alert Processor Service")
try:
# Close RabbitMQ connection
if self.connection and not self.connection.is_closed:
await self.connection.close()
# Close Redis connection
if self.redis:
await self.redis.close()
logger.info("Alert Processor Service stopped")
except Exception as e:
logger.error("Error stopping service", error=str(e))
def get_metrics(self) -> Dict[str, Any]:
"""Get service metrics"""
return {
"items_processed": self.items_processed,
"items_stored": self.items_stored,
"notifications_sent": self.notifications_sent,
"errors_count": self.errors_count,
"running": self.running
}
async def main():
"""Main entry point"""
config = AlertProcessorConfig()
service = AlertProcessorService(config)
# Setup signal handlers for graceful shutdown
async def shutdown():
logger.info("Received shutdown signal")
await service.stop()
sys.exit(0)
# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda s, f: asyncio.create_task(shutdown()))
try:
# Start the service
await service.start()
# Keep running
while service.running:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
except Exception as e:
logger.error("Service failed", error=str(e))
finally:
await service.stop()
if __name__ == "__main__":
asyncio.run(main())