""" Server-Sent Events (SSE) service using Redis pub/sub. """ from typing import AsyncGenerator import json import structlog from redis.asyncio import Redis from app.core.config import settings from app.models.events import Event from shared.redis_utils import get_redis_client logger = structlog.get_logger() class SSEService: """ Manage real-time event streaming via Redis pub/sub. Pattern: alerts:{tenant_id} """ def __init__(self, redis: Redis = None): self._redis = redis # Use private attribute to allow lazy loading self.prefix = settings.REDIS_SSE_PREFIX @property async def redis(self) -> Redis: """ Lazy load Redis client if not provided through dependency injection. Uses the shared Redis utilities for consistency. """ if self._redis is None: self._redis = await get_redis_client() return self._redis async def publish_event(self, event: Event) -> bool: """ Publish event to Redis for SSE streaming. Args: event: Event to publish Returns: True if published successfully """ try: redis_client = await self.redis # Build channel name channel = f"{self.prefix}:{event.tenant_id}" # Build message payload payload = { "id": str(event.id), "tenant_id": str(event.tenant_id), "event_class": event.event_class, "event_domain": event.event_domain, "event_type": event.event_type, "priority_score": event.priority_score, "priority_level": event.priority_level, "type_class": event.type_class, "status": event.status, "created_at": event.created_at.isoformat(), "i18n": { "title_key": event.i18n_title_key, "title_params": event.i18n_title_params, "message_key": event.i18n_message_key, "message_params": event.i18n_message_params }, "smart_actions": event.smart_actions, "entity_links": event.entity_links } # Publish to Redis await redis_client.publish(channel, json.dumps(payload)) logger.debug( "sse_event_published", channel=channel, event_type=event.event_type, event_id=str(event.id) ) return True except Exception as e: logger.error( "sse_publish_failed", error=str(e), event_id=str(event.id) ) return False async def subscribe_to_tenant( self, tenant_id: str ) -> AsyncGenerator[str, None]: """ Subscribe to tenant's alert stream. Args: tenant_id: Tenant UUID Yields: JSON-encoded event messages """ redis_client = await self.redis channel = f"{self.prefix}:{tenant_id}" logger.info("sse_subscription_started", channel=channel) # Subscribe to Redis channel pubsub = redis_client.pubsub() await pubsub.subscribe(channel) try: async for message in pubsub.listen(): if message["type"] == "message": yield message["data"] except Exception as e: logger.error("sse_subscription_error", error=str(e), channel=channel) raise finally: await pubsub.unsubscribe(channel) await pubsub.close() logger.info("sse_subscription_closed", channel=channel)