Files
bakery-ia/services/alert_processor/app/api_server.py
Claude 41d292913a feat: Add alert endpoints to alert_processor service for dashboard
Implemented missing alert endpoints that the dashboard requires for
health status and action queue functionality.

Alert Processor Service Changes:
- Created alerts_repository.py:
  * get_alerts() - Filter alerts by severity/status/resolved with pagination
  * get_alerts_summary() - Count alerts by severity and status
  * get_alert_by_id() - Get specific alert

- Created alerts.py API endpoints:
  * GET /api/v1/tenants/{tenant_id}/alerts/summary - Alert counts
  * GET /api/v1/tenants/{tenant_id}/alerts - Filtered alert list
  * GET /api/v1/tenants/{tenant_id}/alerts/{alert_id} - Single alert

- Severity mapping: "critical" (dashboard) maps to "urgent" (alert_processor)
- Status enum: active, resolved, acknowledged, ignored
- Severity enum: low, medium, high, urgent

API Server Changes:
- Registered alerts_router in api_server.py
- Exported alerts_router in __init__.py

Procurement Client Changes:
- Updated get_critical_alerts() to use /alerts path
- Updated get_alerts_summary() to use /alerts/summary path
- Added severity mapping (critical → urgent)
- Added documentation about gateway routing

This fixes the 404 errors for alert endpoints in the dashboard.
2025-11-07 22:16:16 +00:00

86 lines
2.1 KiB
Python

"""
Alert Processor API Server
Provides REST API endpoints for alert analytics
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import structlog
from app.config import AlertProcessorConfig
from app.api import analytics_router, alerts_router
from shared.database.base import create_database_manager
logger = structlog.get_logger()
# Create FastAPI app
app = FastAPI(
title="Alert Processor API",
description="API for alert analytics and interaction tracking",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(analytics_router, tags=["analytics"])
app.include_router(alerts_router, tags=["alerts"])
# Initialize database
config = AlertProcessorConfig()
db_manager = create_database_manager(config.DATABASE_URL, "alert-processor-api")
@app.on_event("startup")
async def startup():
"""Initialize on startup"""
logger.info("Alert Processor API starting up")
# Create tables
try:
from app.models.alerts import Base
await db_manager.create_tables(Base.metadata)
logger.info("Database tables ensured")
except Exception as e:
logger.error("Failed to create tables", error=str(e))
@app.on_event("shutdown")
async def shutdown():
"""Cleanup on shutdown"""
logger.info("Alert Processor API shutting down")
await db_manager.close_connections()
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "alert-processor-api"}
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "Alert Processor API",
"version": "1.0.0",
"endpoints": {
"health": "/health",
"docs": "/docs",
"analytics": "/api/v1/tenants/{tenant_id}/alerts/analytics",
"interactions": "/api/v1/tenants/{tenant_id}/alerts/{alert_id}/interactions"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8010)