Files
bakery-ia/services/notification/app/services/sse_service.py
2025-10-19 19:22:37 +02:00

277 lines
11 KiB
Python

# services/notification/app/services/sse_service.py
"""
Server-Sent Events service for real-time notifications
Integrated within the notification service for alerts and recommendations
"""
import asyncio
import json
from typing import Dict, Set, Any
from datetime import datetime
import structlog
from shared.redis_utils import initialize_redis, get_redis_client, close_redis
logger = structlog.get_logger()
class SSEService:
"""
Server-Sent Events service for real-time notifications
Handles both alerts and recommendations through unified SSE streams
"""
def __init__(self):
self.redis = None
self.redis_url = None
self.active_connections: Dict[str, Set[asyncio.Queue]] = {}
self.pubsub_tasks: Dict[str, asyncio.Task] = {}
async def initialize(self, redis_url: str):
"""Initialize Redis connection"""
try:
self.redis_url = redis_url
# Initialize shared Redis connection for SSE
await initialize_redis(redis_url, db=0, max_connections=30)
self.redis = await get_redis_client()
logger.info("SSE Service initialized with shared Redis connection")
except Exception as e:
logger.error("Failed to initialize SSE service", error=str(e))
raise
async def shutdown(self):
"""Clean shutdown"""
try:
# Cancel all pubsub tasks
for task in self.pubsub_tasks.values():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Close all client connections
for tenant_id, connections in self.active_connections.items():
for queue in connections.copy():
try:
await queue.put({"event": "shutdown", "data": json.dumps({"status": "server_shutdown"})})
except:
pass
# Close shared Redis connection
await close_redis()
logger.info("SSE Service shutdown completed")
except Exception as e:
logger.error("Error during SSE shutdown", error=str(e))
async def add_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Add a new SSE client connection"""
try:
if tenant_id not in self.active_connections:
self.active_connections[tenant_id] = set()
# Start pubsub listener for this tenant if not exists
if tenant_id not in self.pubsub_tasks:
task = asyncio.create_task(self._listen_to_tenant_channel(tenant_id))
self.pubsub_tasks[tenant_id] = task
self.active_connections[tenant_id].add(client_queue)
client_count = len(self.active_connections[tenant_id])
logger.info("SSE client added",
tenant_id=tenant_id,
total_clients=client_count)
# Send connection confirmation
await client_queue.put({
"event": "connected",
"data": json.dumps({
"status": "connected",
"tenant_id": tenant_id,
"timestamp": datetime.utcnow().isoformat(),
"client_count": client_count
})
})
# Send any active items (alerts and recommendations)
active_items = await self.get_active_items(tenant_id)
if active_items:
await client_queue.put({
"event": "initial_items",
"data": json.dumps(active_items)
})
except Exception as e:
logger.error("Error adding SSE client", tenant_id=tenant_id, error=str(e))
async def remove_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Remove SSE client connection"""
try:
if tenant_id in self.active_connections:
self.active_connections[tenant_id].discard(client_queue)
# If no more clients for this tenant, stop the pubsub listener
if not self.active_connections[tenant_id]:
del self.active_connections[tenant_id]
if tenant_id in self.pubsub_tasks:
task = self.pubsub_tasks[tenant_id]
if not task.done():
task.cancel()
del self.pubsub_tasks[tenant_id]
logger.info("SSE client removed", tenant_id=tenant_id)
except Exception as e:
logger.error("Error removing SSE client", tenant_id=tenant_id, error=str(e))
async def _listen_to_tenant_channel(self, tenant_id: str):
"""Listen to Redis channel for tenant-specific items"""
pubsub = None
try:
# Use the shared Redis client for pubsub
pubsub = self.redis.pubsub()
channel = f"alerts:{tenant_id}"
await pubsub.subscribe(channel)
logger.info("Started listening to tenant channel",
tenant_id=tenant_id,
channel=channel)
async for message in pubsub.listen():
if message["type"] == "message":
# Broadcast to all connected clients for this tenant
await self.broadcast_to_tenant(tenant_id, message["data"])
except asyncio.CancelledError:
logger.info("Stopped listening to tenant channel", tenant_id=tenant_id)
except Exception as e:
logger.error("Error in pubsub listener", tenant_id=tenant_id, error=str(e))
finally:
if pubsub:
try:
await pubsub.unsubscribe(channel)
await pubsub.close()
except:
pass
async def broadcast_to_tenant(self, tenant_id: str, message: str):
"""Broadcast message to all connected clients of a tenant"""
if tenant_id not in self.active_connections:
return
try:
item_data = json.loads(message)
event = {
"event": item_data.get('item_type', 'item'), # 'alert' or 'recommendation'
"data": json.dumps(item_data),
"id": item_data.get("id")
}
# Send to all connected clients
disconnected = []
for client_queue in self.active_connections[tenant_id]:
try:
# Use put_nowait to avoid blocking
client_queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning("Client queue full, dropping message", tenant_id=tenant_id)
disconnected.append(client_queue)
except Exception as e:
logger.warning("Failed to send to client", tenant_id=tenant_id, error=str(e))
disconnected.append(client_queue)
# Clean up disconnected clients
for queue in disconnected:
await self.remove_client(tenant_id, queue)
if disconnected:
logger.info("Cleaned up disconnected clients",
tenant_id=tenant_id,
count=len(disconnected))
except Exception as e:
logger.error("Error broadcasting to tenant", tenant_id=tenant_id, error=str(e))
async def send_item_notification(self, tenant_id: str, item: Dict[str, Any]):
"""
Send alert or recommendation via SSE (called by notification orchestrator)
"""
try:
# Publish to Redis for SSE streaming
channel = f"alerts:{tenant_id}"
item_message = {
'id': item.get('id'),
'item_type': item.get('type'), # 'alert' or 'recommendation'
'type': item.get('alert_type', item.get('type')),
'severity': item.get('severity'),
'title': item.get('title'),
'message': item.get('message'),
'actions': item.get('actions', []),
'metadata': item.get('metadata', {}),
'timestamp': item.get('timestamp', datetime.utcnow().isoformat()),
'status': 'active'
}
await self.redis.publish(channel, json.dumps(item_message))
logger.info("Item published to SSE",
tenant_id=tenant_id,
item_type=item.get('type'),
item_id=item.get('id'))
except Exception as e:
logger.error("Error sending item notification via SSE",
tenant_id=tenant_id,
error=str(e))
async def get_active_items(self, tenant_id: str) -> list:
"""
Fetch active alerts and recommendations from Redis cache.
NOTE: We use Redis as the source of truth for active alerts to maintain
microservices architecture. The alert_processor service caches active alerts
in Redis when they are created, and we read from that cache here.
This avoids direct database coupling between services.
"""
try:
if not self.redis:
logger.warning("Redis not available, returning empty list", tenant_id=tenant_id)
return []
# Try to get cached active alerts for this tenant from Redis
cache_key = f"active_alerts:{tenant_id}"
cached_data = await self.redis.get(cache_key)
if cached_data:
active_items = json.loads(cached_data)
logger.info("Fetched active alerts from Redis cache",
tenant_id=tenant_id,
count=len(active_items))
return active_items
else:
logger.info("No cached alerts found for tenant",
tenant_id=tenant_id)
return []
except Exception as e:
logger.error("Error fetching active items from Redis",
tenant_id=tenant_id,
error=str(e),
exc_info=True)
return []
def get_metrics(self) -> Dict[str, Any]:
"""Get SSE service metrics"""
redis_connected = False
try:
redis_connected = self.redis and hasattr(self.redis, 'connection_pool') and self.redis.connection_pool
except:
redis_connected = False
return {
"active_tenants": len(self.active_connections),
"total_connections": sum(len(connections) for connections in self.active_connections.values()),
"active_listeners": len(self.pubsub_tasks),
"redis_connected": redis_connected
}