# services/alert_processor/app/main.py """ Alert Processor Service - Central hub for processing alerts and recommendations Consumes from RabbitMQ, stores in database, and routes to notification service """ import asyncio import json import signal import sys from datetime import datetime from typing import Dict, Any import structlog import redis.asyncio as aioredis from aio_pika import connect_robust, IncomingMessage, ExchangeType from app.config import AlertProcessorConfig from shared.database.base import create_database_manager from shared.clients.base_service_client import BaseServiceClient from shared.config.rabbitmq_config import RABBITMQ_CONFIG # Setup logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="ISO"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger() class NotificationServiceClient(BaseServiceClient): """Client for notification service""" def __init__(self, config: AlertProcessorConfig): super().__init__("notification-service", config) self.config = config def get_service_base_path(self) -> str: """Return the base path for notification service APIs""" return "/api/v1" async def send_notification(self, tenant_id: str, notification: Dict[str, Any], channels: list) -> Dict[str, Any]: """Send notification via notification service""" try: response = await self.post( "/api/v1/notifications/send", json={ "tenant_id": tenant_id, "notification": notification, "channels": channels } ) return response except Exception as e: logger.error("Failed to send notification", error=str(e), tenant_id=tenant_id) return {"status": "failed", "error": str(e)} class AlertProcessorService: """ Central service for processing and routing alerts and recommendations Integrates with notification service for multi-channel delivery """ def __init__(self, config: AlertProcessorConfig): self.config = config self.db_manager = create_database_manager(config.DATABASE_URL, "alert-processor") self.notification_client = NotificationServiceClient(config) self.redis = None self.connection = None self.channel = None self.running = False # Metrics self.items_processed = 0 self.items_stored = 0 self.notifications_sent = 0 self.errors_count = 0 async def start(self): """Start the alert processor service""" try: logger.info("Starting Alert Processor Service") # Connect to Redis for SSE publishing self.redis = aioredis.from_url(self.config.REDIS_URL) logger.info("Connected to Redis") # Connect to RabbitMQ await self._setup_rabbitmq() # Start consuming messages await self._start_consuming() self.running = True logger.info("Alert Processor Service started successfully") except Exception as e: logger.error("Failed to start Alert Processor Service", error=str(e)) raise async def _setup_rabbitmq(self): """Setup RabbitMQ connection and configuration""" self.connection = await connect_robust( self.config.RABBITMQ_URL, heartbeat=30, connection_attempts=5 ) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=10) # Process 10 messages at a time # Setup exchange and queue based on config exchange_config = RABBITMQ_CONFIG["exchanges"]["alerts"] self.exchange = await self.channel.declare_exchange( exchange_config["name"], getattr(ExchangeType, exchange_config["type"].upper()), durable=exchange_config["durable"] ) queue_config = RABBITMQ_CONFIG["queues"]["alert_processing"] self.queue = await self.channel.declare_queue( queue_config["name"], durable=queue_config["durable"], arguments=queue_config["arguments"] ) # Bind to all alert and recommendation routing keys await self.queue.bind(self.exchange, routing_key="*.*.*") logger.info("RabbitMQ setup completed") async def _start_consuming(self): """Start consuming messages from RabbitMQ""" await self.queue.consume(self.process_item) logger.info("Started consuming alert messages") async def process_item(self, message: IncomingMessage): """Process incoming alert or recommendation""" async with message.process(): try: # Parse message item = json.loads(message.body.decode()) logger.info("Processing item", item_type=item.get('item_type'), alert_type=item.get('type'), severity=item.get('severity'), tenant_id=item.get('tenant_id')) # Store in database stored_item = await self.store_item(item) self.items_stored += 1 # Determine delivery channels based on severity and type channels = self.get_channels_by_severity_and_type( item['severity'], item['item_type'] ) # Send via notification service if channels are specified if channels: notification_result = await self.notification_client.send_notification( tenant_id=item['tenant_id'], notification={ 'type': item['item_type'], # 'alert' or 'recommendation' 'id': item['id'], 'title': item['title'], 'message': item['message'], 'severity': item['severity'], 'metadata': item.get('metadata', {}), 'actions': item.get('actions', []), 'email': item.get('email'), 'phone': item.get('phone'), 'user_id': item.get('user_id') }, channels=channels ) if notification_result.get('status') == 'success': self.notifications_sent += 1 # Stream to SSE for real-time dashboard (always) await self.stream_to_sse(item['tenant_id'], stored_item) self.items_processed += 1 logger.info("Item processed successfully", item_id=item['id'], channels=len(channels)) except Exception as e: self.errors_count += 1 logger.error("Item processing failed", error=str(e)) raise async def store_item(self, item: dict) -> dict: """Store alert or recommendation in database""" from app.models.alerts import Alert, AlertSeverity, AlertStatus from sqlalchemy import select async with self.db_manager.get_session() as session: # Create alert instance alert = Alert( id=item['id'], tenant_id=item['tenant_id'], item_type=item['item_type'], # 'alert' or 'recommendation' alert_type=item['type'], severity=AlertSeverity(item['severity'].lower()), status=AlertStatus.ACTIVE, service=item['service'], title=item['title'], message=item['message'], actions=item.get('actions', []), alert_metadata=item.get('metadata', {}), created_at=datetime.fromisoformat(item['timestamp']) if isinstance(item['timestamp'], str) else item['timestamp'] ) session.add(alert) await session.commit() await session.refresh(alert) logger.debug("Item stored in database", item_id=item['id']) # Convert to dict for return return { 'id': str(alert.id), 'tenant_id': str(alert.tenant_id), 'item_type': alert.item_type, 'alert_type': alert.alert_type, 'severity': alert.severity.value, 'status': alert.status.value, 'service': alert.service, 'title': alert.title, 'message': alert.message, 'actions': alert.actions, 'metadata': alert.alert_metadata, 'created_at': alert.created_at } async def stream_to_sse(self, tenant_id: str, item: dict): """Publish item to Redis for SSE streaming""" channel = f"alerts:{tenant_id}" # Prepare message for SSE sse_message = { 'id': item['id'], 'item_type': item['item_type'], 'type': item['alert_type'], 'severity': item['severity'], 'title': item['title'], 'message': item['message'], 'actions': json.loads(item['actions']) if isinstance(item['actions'], str) else item['actions'], 'metadata': json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata'], 'timestamp': item['created_at'].isoformat() if hasattr(item['created_at'], 'isoformat') else item['created_at'], 'status': item['status'] } # Publish to Redis channel for SSE await self.redis.publish(channel, json.dumps(sse_message)) logger.debug("Item published to SSE", tenant_id=tenant_id, item_id=item['id']) def get_channels_by_severity_and_type(self, severity: str, item_type: str) -> list: """Determine notification channels based on severity, type, and time""" current_hour = datetime.now().hour channels = ['dashboard'] # Always include dashboard (SSE) if item_type == 'alert': if severity == 'urgent': # Urgent alerts: All channels immediately channels.extend(['whatsapp', 'email', 'push']) elif severity == 'high': # High alerts: WhatsApp and email during extended hours if 6 <= current_hour <= 22: channels.extend(['whatsapp', 'email']) else: channels.append('email') # Email only during night elif severity == 'medium': # Medium alerts: Email during business hours if 7 <= current_hour <= 20: channels.append('email') # Low severity: Dashboard only elif item_type == 'recommendation': # Recommendations: Less urgent, limit channels and respect business hours if severity in ['medium', 'high']: if 8 <= current_hour <= 19: # Business hours for recommendations channels.append('email') # Low/urgent (rare for recs): Dashboard only return channels async def stop(self): """Stop the alert processor service""" self.running = False logger.info("Stopping Alert Processor Service") try: # Close RabbitMQ connection if self.connection and not self.connection.is_closed: await self.connection.close() # Close Redis connection if self.redis: await self.redis.close() logger.info("Alert Processor Service stopped") except Exception as e: logger.error("Error stopping service", error=str(e)) def get_metrics(self) -> Dict[str, Any]: """Get service metrics""" return { "items_processed": self.items_processed, "items_stored": self.items_stored, "notifications_sent": self.notifications_sent, "errors_count": self.errors_count, "running": self.running } async def main(): """Main entry point""" config = AlertProcessorConfig() service = AlertProcessorService(config) # Setup signal handlers for graceful shutdown async def shutdown(): logger.info("Received shutdown signal") await service.stop() sys.exit(0) # Register signal handlers for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, lambda s, f: asyncio.create_task(shutdown())) try: # Start the service await service.start() # Keep running while service.running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Received keyboard interrupt") except Exception as e: logger.error("Service failed", error=str(e)) finally: await service.stop() if __name__ == "__main__": asyncio.run(main())