189 lines
6.6 KiB
Python
189 lines
6.6 KiB
Python
|
|
# services/notification/app/api/sse_routes.py
|
||
|
|
"""
|
||
|
|
SSE routes for real-time alert and recommendation streaming
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
from datetime import datetime
|
||
|
|
from typing import Optional
|
||
|
|
from fastapi import APIRouter, Request, Depends, HTTPException, BackgroundTasks
|
||
|
|
from sse_starlette.sse import EventSourceResponse
|
||
|
|
import structlog
|
||
|
|
|
||
|
|
from shared.auth.decorators import get_current_user
|
||
|
|
|
||
|
|
router = APIRouter(prefix="/sse", tags=["sse"])
|
||
|
|
logger = structlog.get_logger()
|
||
|
|
|
||
|
|
@router.get("/alerts/stream/{tenant_id}")
|
||
|
|
async def stream_alerts(
|
||
|
|
tenant_id: str,
|
||
|
|
request: Request,
|
||
|
|
background_tasks: BackgroundTasks,
|
||
|
|
current_user = Depends(get_current_user)
|
||
|
|
):
|
||
|
|
"""
|
||
|
|
SSE endpoint for real-time alert and recommendation streaming
|
||
|
|
Supports both alerts and recommendations through unified stream
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Verify user has access to this tenant
|
||
|
|
if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id):
|
||
|
|
raise HTTPException(403, "Access denied to this tenant")
|
||
|
|
|
||
|
|
# Get SSE service from app state
|
||
|
|
sse_service = getattr(request.app.state, 'sse_service', None)
|
||
|
|
if not sse_service:
|
||
|
|
raise HTTPException(500, "SSE service not available")
|
||
|
|
|
||
|
|
async def event_generator():
|
||
|
|
"""Generate SSE events for the client"""
|
||
|
|
client_queue = asyncio.Queue(maxsize=100) # Limit queue size
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Register client
|
||
|
|
await sse_service.add_client(tenant_id, client_queue)
|
||
|
|
|
||
|
|
logger.info("SSE client connected",
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
user_id=getattr(current_user, 'id', 'unknown'))
|
||
|
|
|
||
|
|
# Stream events
|
||
|
|
while True:
|
||
|
|
# Check if client disconnected
|
||
|
|
if await request.is_disconnected():
|
||
|
|
logger.info("SSE client disconnected", tenant_id=tenant_id)
|
||
|
|
break
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Wait for events with timeout for keepalive
|
||
|
|
event = await asyncio.wait_for(
|
||
|
|
client_queue.get(),
|
||
|
|
timeout=30.0
|
||
|
|
)
|
||
|
|
|
||
|
|
yield event
|
||
|
|
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
# Send keepalive ping
|
||
|
|
yield {
|
||
|
|
"event": "ping",
|
||
|
|
"data": json.dumps({
|
||
|
|
"timestamp": datetime.utcnow().isoformat(),
|
||
|
|
"status": "keepalive"
|
||
|
|
}),
|
||
|
|
"id": f"ping_{int(datetime.now().timestamp())}"
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Error in SSE event generator",
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
error=str(e))
|
||
|
|
break
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("SSE connection error",
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
error=str(e))
|
||
|
|
finally:
|
||
|
|
# Clean up on disconnect
|
||
|
|
try:
|
||
|
|
await sse_service.remove_client(tenant_id, client_queue)
|
||
|
|
logger.info("SSE client cleanup completed", tenant_id=tenant_id)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Error cleaning up SSE client",
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
error=str(e))
|
||
|
|
|
||
|
|
return EventSourceResponse(
|
||
|
|
event_generator(),
|
||
|
|
media_type="text/event-stream",
|
||
|
|
headers={
|
||
|
|
"Cache-Control": "no-cache",
|
||
|
|
"Connection": "keep-alive",
|
||
|
|
"X-Accel-Buffering": "no", # Disable nginx buffering
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@router.post("/items/{item_id}/acknowledge")
|
||
|
|
async def acknowledge_item(
|
||
|
|
item_id: str,
|
||
|
|
current_user = Depends(get_current_user)
|
||
|
|
):
|
||
|
|
"""Acknowledge an alert or recommendation"""
|
||
|
|
try:
|
||
|
|
# This would update the database
|
||
|
|
# For now, just return success
|
||
|
|
|
||
|
|
logger.info("Item acknowledged",
|
||
|
|
item_id=item_id,
|
||
|
|
user_id=getattr(current_user, 'id', 'unknown'))
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "success",
|
||
|
|
"item_id": item_id,
|
||
|
|
"acknowledged_by": getattr(current_user, 'id', 'unknown'),
|
||
|
|
"acknowledged_at": datetime.utcnow().isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Failed to acknowledge item", item_id=item_id, error=str(e))
|
||
|
|
raise HTTPException(500, "Failed to acknowledge item")
|
||
|
|
|
||
|
|
@router.post("/items/{item_id}/resolve")
|
||
|
|
async def resolve_item(
|
||
|
|
item_id: str,
|
||
|
|
current_user = Depends(get_current_user)
|
||
|
|
):
|
||
|
|
"""Resolve an alert or recommendation"""
|
||
|
|
try:
|
||
|
|
# This would update the database
|
||
|
|
# For now, just return success
|
||
|
|
|
||
|
|
logger.info("Item resolved",
|
||
|
|
item_id=item_id,
|
||
|
|
user_id=getattr(current_user, 'id', 'unknown'))
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "success",
|
||
|
|
"item_id": item_id,
|
||
|
|
"resolved_by": getattr(current_user, 'id', 'unknown'),
|
||
|
|
"resolved_at": datetime.utcnow().isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Failed to resolve item", item_id=item_id, error=str(e))
|
||
|
|
raise HTTPException(500, "Failed to resolve item")
|
||
|
|
|
||
|
|
@router.get("/status/{tenant_id}")
|
||
|
|
async def get_sse_status(
|
||
|
|
tenant_id: str,
|
||
|
|
current_user = Depends(get_current_user)
|
||
|
|
):
|
||
|
|
"""Get SSE connection status for a tenant"""
|
||
|
|
|
||
|
|
# Verify user has access to this tenant
|
||
|
|
if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id):
|
||
|
|
raise HTTPException(403, "Access denied to this tenant")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get SSE service from app state
|
||
|
|
sse_service = getattr(request.app.state, 'sse_service', None)
|
||
|
|
if not sse_service:
|
||
|
|
return {"status": "unavailable", "message": "SSE service not initialized"}
|
||
|
|
|
||
|
|
metrics = sse_service.get_metrics()
|
||
|
|
tenant_connections = len(sse_service.active_connections.get(tenant_id, set()))
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "available",
|
||
|
|
"tenant_id": tenant_id,
|
||
|
|
"connections": tenant_connections,
|
||
|
|
"total_connections": metrics["total_connections"],
|
||
|
|
"active_tenants": metrics["active_tenants"]
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Failed to get SSE status", tenant_id=tenant_id, error=str(e))
|
||
|
|
raise HTTPException(500, "Failed to get SSE status")
|