Files
bakery-ia/services/notification/app/api/sse_routes.py
2025-09-21 17:35:36 +02:00

199 lines
6.9 KiB
Python

# services/notification/app/api/sse_routes.py
"""
SSE routes for real-time alert and recommendation streaming
"""
import asyncio
import json
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Request, Depends, HTTPException, BackgroundTasks
from sse_starlette.sse import EventSourceResponse
import structlog
from shared.auth.decorators import get_current_user
router = APIRouter(prefix="/sse", tags=["sse"])
logger = structlog.get_logger()
@router.get("/alerts/stream/{tenant_id}")
async def stream_alerts(
tenant_id: str,
request: Request,
background_tasks: BackgroundTasks,
token: Optional[str] = None
):
"""
SSE endpoint for real-time alert and recommendation streaming
Supports both alerts and recommendations through unified stream
"""
# Validate token and get user (skip for now to test connection)
# TODO: Add proper token validation in production
current_user = None
if token:
try:
# In a real implementation, validate the JWT token here
# For now, skip validation to test the connection
pass
except Exception:
raise HTTPException(401, "Invalid token")
# Skip tenant access validation for testing
# TODO: Add tenant access validation in production
# Get SSE service from app state
sse_service = getattr(request.app.state, 'sse_service', None)
if not sse_service:
raise HTTPException(500, "SSE service not available")
async def event_generator():
"""Generate SSE events for the client"""
client_queue = asyncio.Queue(maxsize=100) # Limit queue size
try:
# Register client
await sse_service.add_client(tenant_id, client_queue)
logger.info("SSE client connected",
tenant_id=tenant_id,
user_id=getattr(current_user, 'id', 'unknown'))
# Stream events
while True:
# Check if client disconnected
if await request.is_disconnected():
logger.info("SSE client disconnected", tenant_id=tenant_id)
break
try:
# Wait for events with timeout for keepalive
event = await asyncio.wait_for(
client_queue.get(),
timeout=30.0
)
yield event
except asyncio.TimeoutError:
# Send keepalive ping
yield {
"event": "ping",
"data": json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"status": "keepalive"
}),
"id": f"ping_{int(datetime.now().timestamp())}"
}
except Exception as e:
logger.error("Error in SSE event generator",
tenant_id=tenant_id,
error=str(e))
break
except Exception as e:
logger.error("SSE connection error",
tenant_id=tenant_id,
error=str(e))
finally:
# Clean up on disconnect
try:
await sse_service.remove_client(tenant_id, client_queue)
logger.info("SSE client cleanup completed", tenant_id=tenant_id)
except Exception as e:
logger.error("Error cleaning up SSE client",
tenant_id=tenant_id,
error=str(e))
return EventSourceResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@router.post("/items/{item_id}/acknowledge")
async def acknowledge_item(
item_id: str,
current_user = Depends(get_current_user)
):
"""Acknowledge an alert or recommendation"""
try:
# This would update the database
# For now, just return success
logger.info("Item acknowledged",
item_id=item_id,
user_id=getattr(current_user, 'id', 'unknown'))
return {
"status": "success",
"item_id": item_id,
"acknowledged_by": getattr(current_user, 'id', 'unknown'),
"acknowledged_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error("Failed to acknowledge item", item_id=item_id, error=str(e))
raise HTTPException(500, "Failed to acknowledge item")
@router.post("/items/{item_id}/resolve")
async def resolve_item(
item_id: str,
current_user = Depends(get_current_user)
):
"""Resolve an alert or recommendation"""
try:
# This would update the database
# For now, just return success
logger.info("Item resolved",
item_id=item_id,
user_id=getattr(current_user, 'id', 'unknown'))
return {
"status": "success",
"item_id": item_id,
"resolved_by": getattr(current_user, 'id', 'unknown'),
"resolved_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error("Failed to resolve item", item_id=item_id, error=str(e))
raise HTTPException(500, "Failed to resolve item")
@router.get("/status/{tenant_id}")
async def get_sse_status(
tenant_id: str,
current_user = Depends(get_current_user)
):
"""Get SSE connection status for a tenant"""
# Verify user has access to this tenant
if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id):
raise HTTPException(403, "Access denied to this tenant")
try:
# Get SSE service from app state
sse_service = getattr(request.app.state, 'sse_service', None)
if not sse_service:
return {"status": "unavailable", "message": "SSE service not initialized"}
metrics = sse_service.get_metrics()
tenant_connections = len(sse_service.active_connections.get(tenant_id, set()))
return {
"status": "available",
"tenant_id": tenant_id,
"connections": tenant_connections,
"total_connections": metrics["total_connections"],
"active_tenants": metrics["active_tenants"]
}
except Exception as e:
logger.error("Failed to get SSE status", tenant_id=tenant_id, error=str(e))
raise HTTPException(500, "Failed to get SSE status")