Files
bakery-ia/services/notification/app/services/sse_service.py
2025-09-21 17:35:36 +02:00

262 lines
10 KiB
Python

# services/notification/app/services/sse_service.py
"""
Server-Sent Events service for real-time notifications
Integrated within the notification service for alerts and recommendations
"""
import asyncio
from redis.asyncio import Redis
import json
from typing import Dict, Set, Any
from datetime import datetime
import structlog
logger = structlog.get_logger()
class SSEService:
"""
Server-Sent Events service for real-time notifications
Handles both alerts and recommendations through unified SSE streams
"""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
self.active_connections: Dict[str, Set[asyncio.Queue]] = {}
self.pubsub_tasks: Dict[str, asyncio.Task] = {}
async def initialize(self):
"""Initialize Redis connection"""
try:
self.redis = Redis.from_url(self.redis_url)
logger.info("SSE Service initialized with Redis connection")
except Exception as e:
logger.error("Failed to initialize SSE service", error=str(e))
raise
async def shutdown(self):
"""Clean shutdown"""
try:
# Cancel all pubsub tasks
for task in self.pubsub_tasks.values():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Close all client connections
for tenant_id, connections in self.active_connections.items():
for queue in connections.copy():
try:
await queue.put({"event": "shutdown", "data": json.dumps({"status": "server_shutdown"})})
except:
pass
# Close Redis connection
if self.redis:
await self.redis.close()
logger.info("SSE Service shutdown completed")
except Exception as e:
logger.error("Error during SSE shutdown", error=str(e))
async def add_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Add a new SSE client connection"""
try:
if tenant_id not in self.active_connections:
self.active_connections[tenant_id] = set()
# Start pubsub listener for this tenant if not exists
if tenant_id not in self.pubsub_tasks:
task = asyncio.create_task(self._listen_to_tenant_channel(tenant_id))
self.pubsub_tasks[tenant_id] = task
self.active_connections[tenant_id].add(client_queue)
client_count = len(self.active_connections[tenant_id])
logger.info("SSE client added",
tenant_id=tenant_id,
total_clients=client_count)
# Send connection confirmation
await client_queue.put({
"event": "connected",
"data": json.dumps({
"status": "connected",
"tenant_id": tenant_id,
"timestamp": datetime.utcnow().isoformat(),
"client_count": client_count
})
})
# Send any active items (alerts and recommendations)
active_items = await self.get_active_items(tenant_id)
if active_items:
await client_queue.put({
"event": "initial_items",
"data": json.dumps(active_items)
})
except Exception as e:
logger.error("Error adding SSE client", tenant_id=tenant_id, error=str(e))
async def remove_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Remove SSE client connection"""
try:
if tenant_id in self.active_connections:
self.active_connections[tenant_id].discard(client_queue)
# If no more clients for this tenant, stop the pubsub listener
if not self.active_connections[tenant_id]:
del self.active_connections[tenant_id]
if tenant_id in self.pubsub_tasks:
task = self.pubsub_tasks[tenant_id]
if not task.done():
task.cancel()
del self.pubsub_tasks[tenant_id]
logger.info("SSE client removed", tenant_id=tenant_id)
except Exception as e:
logger.error("Error removing SSE client", tenant_id=tenant_id, error=str(e))
async def _listen_to_tenant_channel(self, tenant_id: str):
"""Listen to Redis channel for tenant-specific items"""
try:
# Create a separate Redis connection for pubsub
pubsub_redis = Redis.from_url(self.redis_url)
pubsub = pubsub_redis.pubsub()
channel = f"alerts:{tenant_id}"
await pubsub.subscribe(channel)
logger.info("Started listening to tenant channel",
tenant_id=tenant_id,
channel=channel)
async for message in pubsub.listen():
if message["type"] == "message":
# Broadcast to all connected clients for this tenant
await self.broadcast_to_tenant(tenant_id, message["data"])
except asyncio.CancelledError:
logger.info("Stopped listening to tenant channel", tenant_id=tenant_id)
except Exception as e:
logger.error("Error in pubsub listener", tenant_id=tenant_id, error=str(e))
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub_redis.close()
except:
pass
async def broadcast_to_tenant(self, tenant_id: str, message: str):
"""Broadcast message to all connected clients of a tenant"""
if tenant_id not in self.active_connections:
return
try:
item_data = json.loads(message)
event = {
"event": item_data.get('item_type', 'item'), # 'alert' or 'recommendation'
"data": json.dumps(item_data),
"id": item_data.get("id")
}
# Send to all connected clients
disconnected = []
for client_queue in self.active_connections[tenant_id]:
try:
# Use put_nowait to avoid blocking
client_queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning("Client queue full, dropping message", tenant_id=tenant_id)
disconnected.append(client_queue)
except Exception as e:
logger.warning("Failed to send to client", tenant_id=tenant_id, error=str(e))
disconnected.append(client_queue)
# Clean up disconnected clients
for queue in disconnected:
await self.remove_client(tenant_id, queue)
if disconnected:
logger.info("Cleaned up disconnected clients",
tenant_id=tenant_id,
count=len(disconnected))
except Exception as e:
logger.error("Error broadcasting to tenant", tenant_id=tenant_id, error=str(e))
async def send_item_notification(self, tenant_id: str, item: Dict[str, Any]):
"""
Send alert or recommendation via SSE (called by notification orchestrator)
"""
try:
# Publish to Redis for SSE streaming
channel = f"alerts:{tenant_id}"
item_message = {
'id': item.get('id'),
'item_type': item.get('type'), # 'alert' or 'recommendation'
'type': item.get('alert_type', item.get('type')),
'severity': item.get('severity'),
'title': item.get('title'),
'message': item.get('message'),
'actions': item.get('actions', []),
'metadata': item.get('metadata', {}),
'timestamp': item.get('timestamp', datetime.utcnow().isoformat()),
'status': 'active'
}
await self.redis.publish(channel, json.dumps(item_message))
logger.info("Item published to SSE",
tenant_id=tenant_id,
item_type=item.get('type'),
item_id=item.get('id'))
except Exception as e:
logger.error("Error sending item notification via SSE",
tenant_id=tenant_id,
error=str(e))
async def get_active_items(self, tenant_id: str) -> list:
"""Fetch active alerts and recommendations from database"""
try:
# This would integrate with the actual database
# For now, return empty list as placeholder
# In real implementation, this would query the alerts table
# Example query:
# query = """
# SELECT id, item_type, alert_type, severity, title, message,
# actions, metadata, created_at, status
# FROM alerts
# WHERE tenant_id = $1
# AND status = 'active'
# ORDER BY severity_weight DESC, created_at DESC
# LIMIT 50
# """
return [] # Placeholder
except Exception as e:
logger.error("Error fetching active items", tenant_id=tenant_id, error=str(e))
return []
def get_metrics(self) -> Dict[str, Any]:
"""Get SSE service metrics"""
redis_connected = False
try:
redis_connected = self.redis and hasattr(self.redis, 'connection_pool') and self.redis.connection_pool
except:
redis_connected = False
return {
"active_tenants": len(self.active_connections),
"total_connections": sum(len(connections) for connections in self.active_connections.values()),
"active_listeners": len(self.pubsub_tasks),
"redis_connected": redis_connected
}