Files
bakery-ia/services/alert_processor/app/services/sse_service.py
2025-12-05 20:07:01 +01:00

130 lines
3.7 KiB
Python

"""
Server-Sent Events (SSE) service using Redis pub/sub.
"""
from typing import AsyncGenerator
import json
import structlog
from redis.asyncio import Redis
from app.core.config import settings
from app.models.events import Event
from shared.redis_utils import get_redis_client
logger = structlog.get_logger()
class SSEService:
"""
Manage real-time event streaming via Redis pub/sub.
Pattern: alerts:{tenant_id}
"""
def __init__(self, redis: Redis = None):
self._redis = redis # Use private attribute to allow lazy loading
self.prefix = settings.REDIS_SSE_PREFIX
@property
async def redis(self) -> Redis:
"""
Lazy load Redis client if not provided through dependency injection.
Uses the shared Redis utilities for consistency.
"""
if self._redis is None:
self._redis = await get_redis_client()
return self._redis
async def publish_event(self, event: Event) -> bool:
"""
Publish event to Redis for SSE streaming.
Args:
event: Event to publish
Returns:
True if published successfully
"""
try:
redis_client = await self.redis
# Build channel name
channel = f"{self.prefix}:{event.tenant_id}"
# Build message payload
payload = {
"id": str(event.id),
"tenant_id": str(event.tenant_id),
"event_class": event.event_class,
"event_domain": event.event_domain,
"event_type": event.event_type,
"priority_score": event.priority_score,
"priority_level": event.priority_level,
"type_class": event.type_class,
"status": event.status,
"created_at": event.created_at.isoformat(),
"i18n": {
"title_key": event.i18n_title_key,
"title_params": event.i18n_title_params,
"message_key": event.i18n_message_key,
"message_params": event.i18n_message_params
},
"smart_actions": event.smart_actions,
"entity_links": event.entity_links
}
# Publish to Redis
await redis_client.publish(channel, json.dumps(payload))
logger.debug(
"sse_event_published",
channel=channel,
event_type=event.event_type,
event_id=str(event.id)
)
return True
except Exception as e:
logger.error(
"sse_publish_failed",
error=str(e),
event_id=str(event.id)
)
return False
async def subscribe_to_tenant(
self,
tenant_id: str
) -> AsyncGenerator[str, None]:
"""
Subscribe to tenant's alert stream.
Args:
tenant_id: Tenant UUID
Yields:
JSON-encoded event messages
"""
redis_client = await self.redis
channel = f"{self.prefix}:{tenant_id}"
logger.info("sse_subscription_started", channel=channel)
# Subscribe to Redis channel
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
try:
async for message in pubsub.listen():
if message["type"] == "message":
yield message["data"]
except Exception as e:
logger.error("sse_subscription_error", error=str(e), channel=channel)
raise
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
logger.info("sse_subscription_closed", channel=channel)