# services/notification/app/api/sse_routes.py """ SSE routes for real-time alert and recommendation streaming """ import asyncio import json from datetime import datetime from typing import Optional from fastapi import APIRouter, Request, Depends, HTTPException, BackgroundTasks from sse_starlette.sse import EventSourceResponse import structlog from shared.auth.decorators import get_current_user router = APIRouter(prefix="/sse", tags=["sse"]) logger = structlog.get_logger() @router.get("/alerts/stream/{tenant_id}") async def stream_alerts( tenant_id: str, request: Request, background_tasks: BackgroundTasks, token: Optional[str] = None ): """ SSE endpoint for real-time alert and recommendation streaming Supports both alerts and recommendations through unified stream """ # Validate token and get user (skip for now to test connection) # TODO: Add proper token validation in production current_user = None if token: try: # In a real implementation, validate the JWT token here # For now, skip validation to test the connection pass except Exception: raise HTTPException(401, "Invalid token") # Skip tenant access validation for testing # TODO: Add tenant access validation in production # Get SSE service from app state sse_service = getattr(request.app.state, 'sse_service', None) if not sse_service: raise HTTPException(500, "SSE service not available") async def event_generator(): """Generate SSE events for the client""" client_queue = asyncio.Queue(maxsize=100) # Limit queue size try: # Register client await sse_service.add_client(tenant_id, client_queue) logger.info("SSE client connected", tenant_id=tenant_id, user_id=getattr(current_user, 'id', 'unknown')) # Stream events while True: # Check if client disconnected if await request.is_disconnected(): logger.info("SSE client disconnected", tenant_id=tenant_id) break try: # Wait for events with timeout for keepalive event = await asyncio.wait_for( client_queue.get(), timeout=30.0 ) yield event except asyncio.TimeoutError: # Send keepalive ping yield { "event": "ping", "data": json.dumps({ "timestamp": datetime.utcnow().isoformat(), "status": "keepalive" }), "id": f"ping_{int(datetime.now().timestamp())}" } except Exception as e: logger.error("Error in SSE event generator", tenant_id=tenant_id, error=str(e)) break except Exception as e: logger.error("SSE connection error", tenant_id=tenant_id, error=str(e)) finally: # Clean up on disconnect try: await sse_service.remove_client(tenant_id, client_queue) logger.info("SSE client cleanup completed", tenant_id=tenant_id) except Exception as e: logger.error("Error cleaning up SSE client", tenant_id=tenant_id, error=str(e)) return EventSourceResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } ) @router.post("/items/{item_id}/acknowledge") async def acknowledge_item( item_id: str, current_user = Depends(get_current_user) ): """Acknowledge an alert or recommendation""" try: # This would update the database # For now, just return success logger.info("Item acknowledged", item_id=item_id, user_id=getattr(current_user, 'id', 'unknown')) return { "status": "success", "item_id": item_id, "acknowledged_by": getattr(current_user, 'id', 'unknown'), "acknowledged_at": datetime.utcnow().isoformat() } except Exception as e: logger.error("Failed to acknowledge item", item_id=item_id, error=str(e)) raise HTTPException(500, "Failed to acknowledge item") @router.post("/items/{item_id}/resolve") async def resolve_item( item_id: str, current_user = Depends(get_current_user) ): """Resolve an alert or recommendation""" try: # This would update the database # For now, just return success logger.info("Item resolved", item_id=item_id, user_id=getattr(current_user, 'id', 'unknown')) return { "status": "success", "item_id": item_id, "resolved_by": getattr(current_user, 'id', 'unknown'), "resolved_at": datetime.utcnow().isoformat() } except Exception as e: logger.error("Failed to resolve item", item_id=item_id, error=str(e)) raise HTTPException(500, "Failed to resolve item") @router.get("/status/{tenant_id}") async def get_sse_status( tenant_id: str, current_user = Depends(get_current_user) ): """Get SSE connection status for a tenant""" # Verify user has access to this tenant if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id): raise HTTPException(403, "Access denied to this tenant") try: # Get SSE service from app state sse_service = getattr(request.app.state, 'sse_service', None) if not sse_service: return {"status": "unavailable", "message": "SSE service not initialized"} metrics = sse_service.get_metrics() tenant_connections = len(sse_service.active_connections.get(tenant_id, set())) return { "status": "available", "tenant_id": tenant_id, "connections": tenant_connections, "total_connections": metrics["total_connections"], "active_tenants": metrics["active_tenants"] } except Exception as e: logger.error("Failed to get SSE status", tenant_id=tenant_id, error=str(e)) raise HTTPException(500, "Failed to get SSE status")