# services/notification/app/services/sse_service.py """ Server-Sent Events service for real-time notifications Integrated within the notification service for alerts and recommendations """ import asyncio import json from typing import Dict, Set, Any from datetime import datetime import structlog from shared.redis_utils import initialize_redis, get_redis_client, close_redis logger = structlog.get_logger() class SSEService: """ Server-Sent Events service for real-time notifications Handles both alerts and recommendations through unified SSE streams """ def __init__(self): self.redis = None self.redis_url = None self.active_connections: Dict[str, Set[asyncio.Queue]] = {} self.pubsub_tasks: Dict[str, asyncio.Task] = {} async def initialize(self, redis_url: str): """Initialize Redis connection""" try: self.redis_url = redis_url # Initialize shared Redis connection for SSE await initialize_redis(redis_url, db=0, max_connections=30) self.redis = await get_redis_client() logger.info("SSE Service initialized with shared Redis connection") except Exception as e: logger.error("Failed to initialize SSE service", error=str(e)) raise async def shutdown(self): """Clean shutdown""" try: # Cancel all pubsub tasks for task in self.pubsub_tasks.values(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Close all client connections for tenant_id, connections in self.active_connections.items(): for queue in connections.copy(): try: await queue.put({"event": "shutdown", "data": json.dumps({"status": "server_shutdown"})}) except: pass # Close shared Redis connection await close_redis() logger.info("SSE Service shutdown completed") except Exception as e: logger.error("Error during SSE shutdown", error=str(e)) async def add_client(self, tenant_id: str, client_queue: asyncio.Queue): """Add a new SSE client connection""" try: if tenant_id not in self.active_connections: self.active_connections[tenant_id] = set() # Start pubsub listener for this tenant if not exists if tenant_id not in self.pubsub_tasks: task = asyncio.create_task(self._listen_to_tenant_channel(tenant_id)) self.pubsub_tasks[tenant_id] = task self.active_connections[tenant_id].add(client_queue) client_count = len(self.active_connections[tenant_id]) logger.info("SSE client added", tenant_id=tenant_id, total_clients=client_count) # Send connection confirmation await client_queue.put({ "event": "connected", "data": json.dumps({ "status": "connected", "tenant_id": tenant_id, "timestamp": datetime.utcnow().isoformat(), "client_count": client_count }) }) # Send any active items (alerts and recommendations) active_items = await self.get_active_items(tenant_id) if active_items: await client_queue.put({ "event": "initial_items", "data": json.dumps(active_items) }) except Exception as e: logger.error("Error adding SSE client", tenant_id=tenant_id, error=str(e)) async def remove_client(self, tenant_id: str, client_queue: asyncio.Queue): """Remove SSE client connection""" try: if tenant_id in self.active_connections: self.active_connections[tenant_id].discard(client_queue) # If no more clients for this tenant, stop the pubsub listener if not self.active_connections[tenant_id]: del self.active_connections[tenant_id] if tenant_id in self.pubsub_tasks: task = self.pubsub_tasks[tenant_id] if not task.done(): task.cancel() del self.pubsub_tasks[tenant_id] logger.info("SSE client removed", tenant_id=tenant_id) except Exception as e: logger.error("Error removing SSE client", tenant_id=tenant_id, error=str(e)) async def _listen_to_tenant_channel(self, tenant_id: str): """Listen to Redis channel for tenant-specific items""" pubsub = None try: # Use the shared Redis client for pubsub pubsub = self.redis.pubsub() channel = f"alerts:{tenant_id}" await pubsub.subscribe(channel) logger.info("Started listening to tenant channel", tenant_id=tenant_id, channel=channel) async for message in pubsub.listen(): if message["type"] == "message": # Broadcast to all connected clients for this tenant await self.broadcast_to_tenant(tenant_id, message["data"]) except asyncio.CancelledError: logger.info("Stopped listening to tenant channel", tenant_id=tenant_id) except Exception as e: logger.error("Error in pubsub listener", tenant_id=tenant_id, error=str(e)) finally: if pubsub: try: await pubsub.unsubscribe(channel) await pubsub.close() except: pass async def broadcast_to_tenant(self, tenant_id: str, message: str): """Broadcast message to all connected clients of a tenant""" if tenant_id not in self.active_connections: return try: item_data = json.loads(message) event = { "event": item_data.get('item_type', 'item'), # 'alert' or 'recommendation' "data": json.dumps(item_data), "id": item_data.get("id") } # Send to all connected clients disconnected = [] for client_queue in self.active_connections[tenant_id]: try: # Use put_nowait to avoid blocking client_queue.put_nowait(event) except asyncio.QueueFull: logger.warning("Client queue full, dropping message", tenant_id=tenant_id) disconnected.append(client_queue) except Exception as e: logger.warning("Failed to send to client", tenant_id=tenant_id, error=str(e)) disconnected.append(client_queue) # Clean up disconnected clients for queue in disconnected: await self.remove_client(tenant_id, queue) if disconnected: logger.info("Cleaned up disconnected clients", tenant_id=tenant_id, count=len(disconnected)) except Exception as e: logger.error("Error broadcasting to tenant", tenant_id=tenant_id, error=str(e)) async def send_item_notification(self, tenant_id: str, item: Dict[str, Any]): """ Send alert or recommendation via SSE (called by notification orchestrator) """ try: # Publish to Redis for SSE streaming channel = f"alerts:{tenant_id}" item_message = { 'id': item.get('id'), 'item_type': item.get('type'), # 'alert' or 'recommendation' 'type': item.get('alert_type', item.get('type')), 'severity': item.get('severity'), 'title': item.get('title'), 'message': item.get('message'), 'actions': item.get('actions', []), 'metadata': item.get('metadata', {}), 'timestamp': item.get('timestamp', datetime.utcnow().isoformat()), 'status': 'active' } await self.redis.publish(channel, json.dumps(item_message)) logger.info("Item published to SSE", tenant_id=tenant_id, item_type=item.get('type'), item_id=item.get('id')) except Exception as e: logger.error("Error sending item notification via SSE", tenant_id=tenant_id, error=str(e)) async def get_active_items(self, tenant_id: str) -> list: """ Fetch active alerts and recommendations from Redis cache. NOTE: We use Redis as the source of truth for active alerts to maintain microservices architecture. The alert_processor service caches active alerts in Redis when they are created, and we read from that cache here. This avoids direct database coupling between services. """ try: if not self.redis: logger.warning("Redis not available, returning empty list", tenant_id=tenant_id) return [] # Try to get cached active alerts for this tenant from Redis cache_key = f"active_alerts:{tenant_id}" cached_data = await self.redis.get(cache_key) if cached_data: active_items = json.loads(cached_data) logger.info("Fetched active alerts from Redis cache", tenant_id=tenant_id, count=len(active_items)) return active_items else: logger.info("No cached alerts found for tenant", tenant_id=tenant_id) return [] except Exception as e: logger.error("Error fetching active items from Redis", tenant_id=tenant_id, error=str(e), exc_info=True) return [] def get_metrics(self) -> Dict[str, Any]: """Get SSE service metrics""" redis_connected = False try: redis_connected = self.redis and hasattr(self.redis, 'connection_pool') and self.redis.connection_pool except: redis_connected = False return { "active_tenants": len(self.active_connections), "total_connections": sum(len(connections) for connections in self.active_connections.values()), "active_listeners": len(self.pubsub_tasks), "redis_connected": redis_connected }