# services/alert_processor/app/main.py """ Alert Processor Service - Central hub for processing alerts and recommendations Consumes from RabbitMQ, stores in database, and routes to notification service """ import asyncio import json import signal import sys from datetime import datetime from typing import Dict, Any import structlog from shared.redis_utils import initialize_redis, close_redis, get_redis_client from aio_pika import connect_robust, IncomingMessage, ExchangeType from app.config import AlertProcessorConfig from shared.database.base import create_database_manager from shared.clients.base_service_client import BaseServiceClient from shared.config.rabbitmq_config import RABBITMQ_CONFIG # Import enrichment services from app.services.enrichment import ( PriorityScoringService, ContextEnrichmentService, TimingIntelligenceService, OrchestratorClient ) from shared.schemas.alert_types import RawAlert # Setup logging import logging # Configure Python's standard logging first (required for structlog.stdlib.LoggerFactory) logging.basicConfig( format="%(message)s", stream=sys.stdout, level=logging.INFO, ) # Configure structlog to use the standard logging backend structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger() class NotificationServiceClient(BaseServiceClient): """Client for notification service""" def __init__(self, config: AlertProcessorConfig): super().__init__("notification-service", config) self.config = config def get_service_base_path(self) -> str: """Return the base path for notification service APIs""" return "/api/v1" async def send_notification(self, tenant_id: str, notification: Dict[str, Any], channels: list) -> Dict[str, Any]: """Send notification via notification service""" try: response = await self.post( "notifications/send", data={ "tenant_id": tenant_id, "notification": notification, "channels": channels } ) return response if response else {"status": "failed", "error": "No response from notification service"} except Exception as e: logger.error("Failed to send notification", error=str(e), tenant_id=tenant_id) return {"status": "failed", "error": str(e)} class AlertProcessorService: """ Central service for processing and routing alerts and recommendations Integrates with notification service for multi-channel delivery """ def __init__(self, config: AlertProcessorConfig): self.config = config self.db_manager = create_database_manager(config.DATABASE_URL, "alert-processor") self.notification_client = NotificationServiceClient(config) self.redis = None self.connection = None self.channel = None self.running = False # Initialize enrichment services (context_enrichment initialized after Redis connection) self.orchestrator_client = OrchestratorClient(config.ORCHESTRATOR_SERVICE_URL) self.context_enrichment = None # Initialized in start() after Redis connection self.priority_scoring = PriorityScoringService(config) self.timing_intelligence = TimingIntelligenceService(config) # Metrics self.items_processed = 0 self.items_stored = 0 self.notifications_sent = 0 self.errors_count = 0 self.enrichments_count = 0 async def start(self): """Start the alert processor service""" try: logger.info("Starting Alert Processor Service") # Initialize shared Redis connection for SSE publishing await initialize_redis(self.config.REDIS_URL, db=0, max_connections=20) self.redis = await get_redis_client() logger.info("Connected to Redis") # Initialize context enrichment service now that Redis is available self.context_enrichment = ContextEnrichmentService(self.config, self.db_manager, self.redis) logger.info("Initialized context enrichment service") # Connect to RabbitMQ await self._setup_rabbitmq() # Start consuming messages await self._start_consuming() self.running = True logger.info("Alert Processor Service started successfully") except Exception as e: logger.error("Failed to start Alert Processor Service", error=str(e)) raise async def _setup_rabbitmq(self): """Setup RabbitMQ connection and configuration""" self.connection = await connect_robust( self.config.RABBITMQ_URL, heartbeat=30, connection_attempts=5 ) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=10) # Process 10 messages at a time # Setup exchange and queue based on config exchange_config = RABBITMQ_CONFIG["exchanges"]["alerts"] self.exchange = await self.channel.declare_exchange( exchange_config["name"], getattr(ExchangeType, exchange_config["type"].upper()), durable=exchange_config["durable"] ) queue_config = RABBITMQ_CONFIG["queues"]["alert_processing"] self.queue = await self.channel.declare_queue( queue_config["name"], durable=queue_config["durable"], arguments=queue_config["arguments"] ) # Bind to all alert and recommendation routing keys await self.queue.bind(self.exchange, routing_key="*.*.*") logger.info("RabbitMQ setup completed") async def _start_consuming(self): """Start consuming messages from RabbitMQ""" await self.queue.consume(self.process_item) logger.info("Started consuming alert messages") async def process_item(self, message: IncomingMessage): """Process incoming alert or recommendation""" async with message.process(): try: # Parse message item = json.loads(message.body.decode()) logger.info("Processing item", item_type=item.get('item_type'), alert_type=item.get('type'), priority_level=item.get('priority_level', 'standard'), tenant_id=item.get('tenant_id')) # ENRICH ALERT BEFORE STORAGE enriched_item = await self.enrich_alert(item) self.enrichments_count += 1 # Store enriched alert in database stored_item = await self.store_enriched_item(enriched_item) self.items_stored += 1 # Determine delivery channels based on priority score (not severity) channels = self.get_channels_by_priority(enriched_item['priority_score']) # Send via notification service if channels are specified if channels: notification_result = await self.notification_client.send_notification( tenant_id=enriched_item['tenant_id'], notification={ 'type': enriched_item['item_type'], 'id': enriched_item['id'], 'title': enriched_item['title'], 'message': enriched_item['message'], 'priority_score': enriched_item['priority_score'], 'priority_level': enriched_item['priority_level'], 'type_class': enriched_item['type_class'], 'metadata': enriched_item.get('metadata', {}), 'actions': enriched_item.get('smart_actions', []), 'ai_reasoning_summary': enriched_item.get('ai_reasoning_summary'), 'email': enriched_item.get('email'), 'phone': enriched_item.get('phone'), 'user_id': enriched_item.get('user_id') }, channels=channels ) if notification_result and notification_result.get('status') == 'success': self.notifications_sent += 1 # Stream enriched alert to SSE for real-time dashboard (always) await self.stream_to_sse(enriched_item['tenant_id'], stored_item) self.items_processed += 1 logger.info("Item processed successfully", item_id=enriched_item['id'], priority_score=enriched_item['priority_score'], priority_level=enriched_item['priority_level'], channels=len(channels)) except Exception as e: self.errors_count += 1 logger.error("Item processing failed", error=str(e)) raise async def enrich_alert(self, item: dict) -> dict: """ Enrich alert with priority scoring, context, and smart actions. All alerts MUST be enriched - no legacy support. """ try: # Convert dict to RawAlert model # Map 'type' to 'alert_type' and 'metadata' to 'alert_metadata' raw_alert = RawAlert( tenant_id=item['tenant_id'], alert_type=item.get('type', item.get('alert_type', 'unknown')), title=item['title'], message=item['message'], service=item['service'], actions=item.get('actions', []), alert_metadata=item.get('metadata', item.get('alert_metadata', {})), item_type=item.get('item_type', 'alert') ) # Enrich with orchestrator context (AI actions, business impact) enriched = await self.context_enrichment.enrich_alert(raw_alert) # Convert EnrichedAlert back to dict and merge with original item # Use mode='json' to properly serialize datetime objects to ISO strings enriched_dict = enriched.model_dump(mode='json') if hasattr(enriched, 'model_dump') else dict(enriched) enriched_dict['id'] = item['id'] # Preserve original ID enriched_dict['item_type'] = item.get('item_type', 'alert') # Preserve item_type enriched_dict['type'] = enriched_dict.get('alert_type', item.get('type', 'unknown')) # Preserve type field enriched_dict['timestamp'] = item.get('timestamp', datetime.utcnow().isoformat()) enriched_dict['timing_decision'] = enriched_dict.get('timing_decision', 'send_now') # Default timing decision # Map 'actions' to 'smart_actions' for database storage if 'actions' in enriched_dict and 'smart_actions' not in enriched_dict: enriched_dict['smart_actions'] = enriched_dict['actions'] logger.info("Alert enriched successfully", alert_id=enriched_dict['id'], alert_type=enriched_dict.get('alert_type'), priority_score=enriched_dict['priority_score'], priority_level=enriched_dict['priority_level'], type_class=enriched_dict['type_class'], actions_count=len(enriched_dict.get('actions', [])), smart_actions_count=len(enriched_dict.get('smart_actions', []))) return enriched_dict except Exception as e: logger.error("Alert enrichment failed, using fallback", error=str(e), alert_id=item.get('id')) # Fallback: basic enrichment with defaults return self._create_fallback_enrichment(item) def _create_fallback_enrichment(self, item: dict) -> dict: """ Create fallback enrichment when enrichment services fail. Ensures all alerts have required enrichment fields. """ return { **item, 'item_type': item.get('item_type', 'alert'), # Ensure item_type is preserved 'type': item.get('type', 'unknown'), # Ensure type field is preserved 'alert_type': item.get('type', item.get('alert_type', 'unknown')), # Ensure alert_type exists 'priority_score': 50, 'priority_level': 'standard', 'type_class': 'action_needed', 'orchestrator_context': None, 'business_impact': None, 'urgency_context': None, 'user_agency': None, 'trend_context': None, 'smart_actions': item.get('actions', []), 'ai_reasoning_summary': None, 'confidence_score': 0.5, 'timing_decision': 'send_now', 'scheduled_send_time': None, 'placement': ['dashboard'] } async def store_enriched_item(self, enriched_item: dict) -> dict: """Store enriched alert in database with all enrichment fields""" from app.models.events import Alert, AlertStatus from sqlalchemy import select async with self.db_manager.get_session() as session: # Create enriched alert instance alert = Alert( id=enriched_item['id'], tenant_id=enriched_item['tenant_id'], item_type=enriched_item['item_type'], alert_type=enriched_item['type'], status='active', service=enriched_item['service'], title=enriched_item['title'], message=enriched_item['message'], # Enrichment fields (REQUIRED) priority_score=enriched_item['priority_score'], priority_level=enriched_item['priority_level'], type_class=enriched_item['type_class'], # Context enrichment (JSONB) orchestrator_context=enriched_item.get('orchestrator_context'), business_impact=enriched_item.get('business_impact'), urgency_context=enriched_item.get('urgency_context'), user_agency=enriched_item.get('user_agency'), trend_context=enriched_item.get('trend_context'), # Smart actions smart_actions=enriched_item.get('smart_actions', []), # AI reasoning ai_reasoning_summary=enriched_item.get('ai_reasoning_summary'), confidence_score=enriched_item.get('confidence_score', 0.8), # Timing intelligence timing_decision=enriched_item.get('timing_decision', 'send_now'), scheduled_send_time=enriched_item.get('scheduled_send_time'), # Placement placement=enriched_item.get('placement', ['dashboard']), # Metadata (legacy) alert_metadata=enriched_item.get('metadata', {}), # Timestamp created_at=datetime.fromisoformat(enriched_item['timestamp']) if isinstance(enriched_item['timestamp'], str) else enriched_item['timestamp'] ) session.add(alert) await session.commit() await session.refresh(alert) logger.debug("Enriched item stored in database", item_id=enriched_item['id'], priority_score=alert.priority_score, type_class=alert.type_class) # Convert to enriched dict for return alert_dict = alert.to_dict() # Cache active alerts in Redis for SSE initial_items await self._cache_active_alerts(str(alert.tenant_id)) return alert_dict async def _cache_active_alerts(self, tenant_id: str): """ Cache today's active alerts for a tenant in Redis for quick SSE access Only caches alerts from today (00:00 UTC onwards) to avoid flooding the dashboard with historical alerts on initial connection. Analytics endpoints should query the database directly for historical data. """ try: from app.models.events import Alert, AlertStatus from sqlalchemy import select async with self.db_manager.get_session() as session: # Calculate start of today (UTC) to filter only today's alerts today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) # Query only today's active alerts for this tenant # This prevents showing yesterday's alerts on dashboard initial load query = select(Alert).where( Alert.tenant_id == tenant_id, Alert.status == AlertStatus.ACTIVE, Alert.created_at >= today_start # Only today's alerts ).order_by(Alert.created_at.desc()).limit(50) result = await session.execute(query) alerts = result.scalars().all() # Convert to enriched JSON-serializable format active_items = [] for alert in alerts: active_items.append(alert.to_dict()) # Cache in Redis with 1 hour TTL cache_key = f"active_alerts:{tenant_id}" await self.redis.setex( cache_key, 3600, # 1 hour TTL json.dumps(active_items) ) logger.debug("Cached today's active alerts in Redis", tenant_id=tenant_id, count=len(active_items), filter_date=today_start.isoformat()) except Exception as e: logger.error("Failed to cache active alerts", tenant_id=tenant_id, error=str(e)) async def stream_to_sse(self, tenant_id: str, item: dict): """Publish enriched item to Redis for SSE streaming""" channel = f"alerts:{tenant_id}" # Item is already enriched dict from store_enriched_item # Just ensure timestamp is serializable sse_message = { **item, 'timestamp': item['created_at'].isoformat() if hasattr(item['created_at'], 'isoformat') else item['created_at'] } # Publish to Redis channel for SSE await self.redis.publish(channel, json.dumps(sse_message)) logger.debug("Enriched item published to SSE", tenant_id=tenant_id, item_id=item['id'], priority_score=item.get('priority_score')) def get_channels_by_priority(self, priority_score: int) -> list: """ Determine notification channels based on priority score and timing. Uses multi-factor priority score (0-100) instead of legacy severity. """ current_hour = datetime.now().hour channels = ['dashboard'] # Always include dashboard (SSE) # Critical priority (90-100): All channels immediately if priority_score >= self.config.CRITICAL_THRESHOLD: channels.extend(['whatsapp', 'email', 'push']) # Important priority (70-89): WhatsApp and email during extended hours elif priority_score >= self.config.IMPORTANT_THRESHOLD: if 6 <= current_hour <= 22: channels.extend(['whatsapp', 'email']) else: channels.append('email') # Email only during night # Standard priority (50-69): Email during business hours elif priority_score >= self.config.STANDARD_THRESHOLD: if 7 <= current_hour <= 20: channels.append('email') # Info priority (0-49): Dashboard only return channels async def stop(self): """Stop the alert processor service""" self.running = False logger.info("Stopping Alert Processor Service") try: # Close RabbitMQ connection if self.connection and not self.connection.is_closed: await self.connection.close() # Close shared Redis connection await close_redis() logger.info("Alert Processor Service stopped") except Exception as e: logger.error("Error stopping service", error=str(e)) def get_metrics(self) -> Dict[str, Any]: """Get service metrics""" return { "items_processed": self.items_processed, "items_stored": self.items_stored, "enrichments_count": self.enrichments_count, "notifications_sent": self.notifications_sent, "errors_count": self.errors_count, "running": self.running } async def main(): """Main entry point""" print("STARTUP: Inside main() function", file=sys.stderr, flush=True) config = AlertProcessorConfig() print("STARTUP: Config created", file=sys.stderr, flush=True) service = AlertProcessorService(config) print("STARTUP: Service created", file=sys.stderr, flush=True) # Setup signal handlers for graceful shutdown async def shutdown(): logger.info("Received shutdown signal") await service.stop() sys.exit(0) # Register signal handlers for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, lambda s, f: asyncio.create_task(shutdown())) try: # Start the service print("STARTUP: About to start service", file=sys.stderr, flush=True) await service.start() print("STARTUP: Service started successfully", file=sys.stderr, flush=True) # Keep running while service.running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Received keyboard interrupt") except Exception as e: logger.error("Service failed", error=str(e)) finally: await service.stop() if __name__ == "__main__": print("STARTUP: Entering main block", file=sys.stderr, flush=True) try: print("STARTUP: About to run main()", file=sys.stderr, flush=True) asyncio.run(main()) print("STARTUP: main() completed", file=sys.stderr, flush=True) except Exception as e: print(f"STARTUP: FATAL ERROR: {e}", file=sys.stderr, flush=True) import traceback traceback.print_exc(file=sys.stderr) raise