""" Server-Sent Events (SSE) API endpoint. """ from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from uuid import UUID from redis.asyncio import Redis import structlog from shared.redis_utils import get_redis_client from app.services.sse_service import SSEService logger = structlog.get_logger() router = APIRouter() @router.get("/sse/alerts/{tenant_id}") async def stream_alerts(tenant_id: UUID): """ Stream real-time alerts via Server-Sent Events (SSE). Usage (frontend): ```javascript const eventSource = new EventSource('/api/v1/sse/alerts/{tenant_id}'); eventSource.onmessage = (event) => { const alert = JSON.parse(event.data); console.log('New alert:', alert); }; ``` Response format: ``` data: {"id": "...", "event_type": "...", ...} data: {"id": "...", "event_type": "...", ...} ``` """ # Get Redis client from shared utilities redis = await get_redis_client() try: sse_service = SSEService(redis) async def event_generator(): """Generator for SSE stream""" try: async for message in sse_service.subscribe_to_tenant(str(tenant_id)): # Format as SSE message yield f"data: {message}\n\n" except Exception as e: logger.error("sse_stream_error", error=str(e), tenant_id=str(tenant_id)) # Send error message and close yield f"event: error\ndata: {str(e)}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } ) except Exception as e: logger.error("sse_setup_failed", error=str(e), tenant_id=str(tenant_id)) raise HTTPException(status_code=500, detail="Failed to setup SSE stream")