509 lines
18 KiB
Python
Executable File
509 lines
18 KiB
Python
Executable File
"""
|
|
Standardized FastAPI Service Base
|
|
|
|
Provides a unified approach for creating FastAPI microservices with common patterns:
|
|
- Logging setup
|
|
- Metrics initialization
|
|
- Health checks
|
|
- Database initialization
|
|
- CORS middleware
|
|
- Exception handlers
|
|
- Lifespan management
|
|
"""
|
|
|
|
import os
|
|
import structlog
|
|
from typing import Optional, List, Dict, Callable, Any, TYPE_CHECKING
|
|
from contextlib import asynccontextmanager
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.routing import APIRouter
|
|
|
|
from shared.monitoring import (
|
|
setup_logging,
|
|
setup_telemetry
|
|
)
|
|
from shared.monitoring.health_checks import setup_fastapi_health_checks
|
|
from shared.database.base import DatabaseManager
|
|
|
|
if TYPE_CHECKING:
|
|
import uvicorn
|
|
|
|
|
|
class BaseFastAPIService:
|
|
"""
|
|
Base class for FastAPI microservices with standardized patterns
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
service_name: str,
|
|
app_name: str,
|
|
description: str,
|
|
version: str = "1.0.0",
|
|
log_level: str = "INFO",
|
|
cors_origins: Optional[List[str]] = None,
|
|
api_prefix: str = "/api/v1",
|
|
database_manager: Optional[DatabaseManager] = None,
|
|
expected_tables: Optional[List[str]] = None,
|
|
custom_health_checks: Optional[Dict[str, Callable[[], Any]]] = None,
|
|
redis_enabled: bool = False,
|
|
redis_url: Optional[str] = None,
|
|
redis_db: int = 0,
|
|
redis_max_connections: int = 50,
|
|
enable_metrics: bool = True,
|
|
enable_health_checks: bool = True,
|
|
enable_cors: bool = True,
|
|
enable_exception_handlers: bool = True,
|
|
enable_messaging: bool = False,
|
|
enable_tracing: bool = True,
|
|
custom_metrics: Optional[Dict[str, Dict[str, Any]]] = None,
|
|
alert_service_class: Optional[type] = None
|
|
):
|
|
self.service_name = service_name
|
|
self.app_name = app_name
|
|
self.description = description
|
|
self.version = version
|
|
self.log_level = log_level
|
|
self.cors_origins = cors_origins or ["*"]
|
|
self.api_prefix = api_prefix
|
|
self.database_manager = database_manager
|
|
self.expected_tables = expected_tables
|
|
self.custom_health_checks = custom_health_checks or {}
|
|
self.enable_metrics = enable_metrics
|
|
self.enable_health_checks = enable_health_checks
|
|
self.enable_cors = enable_cors
|
|
self.enable_exception_handlers = enable_exception_handlers
|
|
self.enable_messaging = enable_messaging
|
|
self.enable_tracing = enable_tracing
|
|
self.custom_metrics = custom_metrics or {}
|
|
self.alert_service_class = alert_service_class
|
|
|
|
# Initialize logging
|
|
setup_logging(service_name, log_level)
|
|
self.logger = structlog.get_logger()
|
|
|
|
# Initialize Redis client if enabled
|
|
self.redis_enabled = redis_enabled
|
|
self.redis_client = None
|
|
self.redis_initialized = False
|
|
|
|
if redis_enabled:
|
|
self._initialize_redis(redis_url, redis_db, redis_max_connections)
|
|
|
|
# Will be set during app creation
|
|
self.app: Optional[FastAPI] = None
|
|
self.health_manager = None
|
|
self.alert_service = None
|
|
self.telemetry_providers = None # Contains all OTEL providers and metrics collectors
|
|
|
|
def create_app(self, **fastapi_kwargs) -> FastAPI:
|
|
"""
|
|
Create and configure FastAPI application with standardized setup
|
|
"""
|
|
# Default FastAPI configuration
|
|
default_config = {
|
|
"title": self.app_name,
|
|
"description": self.description,
|
|
"version": self.version,
|
|
"openapi_url": f"{self.api_prefix}/openapi.json",
|
|
"docs_url": f"{self.api_prefix}/docs",
|
|
"redoc_url": f"{self.api_prefix}/redoc",
|
|
}
|
|
|
|
# Merge with user-provided config
|
|
config = {**default_config, **fastapi_kwargs}
|
|
|
|
# Create FastAPI app
|
|
self.app = FastAPI(**config)
|
|
|
|
# Setup unified OpenTelemetry telemetry
|
|
# This single call configures:
|
|
# - Distributed tracing (gRPC, port 4317)
|
|
# - OTLP metrics export (gRPC, port 4317)
|
|
# - System metrics collection (CPU, memory, disk, network)
|
|
# - Application metrics (HTTP requests, DB queries)
|
|
# - Structured logs export (HTTP, port 4318)
|
|
try:
|
|
self.telemetry_providers = setup_telemetry(
|
|
app=self.app,
|
|
service_name=self.service_name,
|
|
service_version=self.version,
|
|
enable_traces=self.enable_tracing,
|
|
enable_metrics=self.enable_metrics,
|
|
enable_logs=True, # Controlled by OTEL_LOGS_EXPORTER env var
|
|
enable_system_metrics=True # Controlled by ENABLE_SYSTEM_METRICS env var
|
|
)
|
|
except Exception as e:
|
|
self.logger.warning("Failed to setup telemetry", error=str(e))
|
|
|
|
# Setup lifespan
|
|
self.app.router.lifespan_context = self._create_lifespan()
|
|
|
|
# Setup middleware
|
|
if self.enable_cors:
|
|
self._setup_cors()
|
|
|
|
# Setup exception handlers
|
|
if self.enable_exception_handlers:
|
|
self._setup_exception_handlers()
|
|
|
|
# Setup health checks
|
|
if self.enable_health_checks:
|
|
self._setup_health_checks()
|
|
|
|
# Setup root endpoint
|
|
self._setup_root_endpoint()
|
|
|
|
return self.app
|
|
|
|
def _create_lifespan(self):
|
|
"""Create lifespan context manager"""
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
self.logger.info(f"Starting {self.service_name}", version=self.version)
|
|
|
|
try:
|
|
# Initialize database if provided
|
|
if self.database_manager:
|
|
await self._initialize_database()
|
|
|
|
# Setup messaging if enabled
|
|
if self.enable_messaging:
|
|
await self._setup_messaging()
|
|
|
|
# Initialize alert service if provided
|
|
if self.alert_service_class:
|
|
await self._initialize_alert_service(app)
|
|
|
|
# Register custom metrics if provided
|
|
if self.custom_metrics:
|
|
self.register_custom_metrics(self.custom_metrics)
|
|
|
|
# Custom startup logic
|
|
await self.on_startup(app)
|
|
|
|
# Mark service as ready
|
|
app.state.ready = True
|
|
|
|
self.logger.info(f"{self.service_name} started successfully")
|
|
|
|
yield
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Startup failed for {self.service_name}", error=str(e))
|
|
raise
|
|
finally:
|
|
# Shutdown
|
|
self.logger.info(f"Shutting down {self.service_name}")
|
|
try:
|
|
await self.on_shutdown(app)
|
|
|
|
# Cleanup alert service if it exists
|
|
if self.alert_service:
|
|
await self._cleanup_alert_service()
|
|
|
|
# Cleanup messaging if enabled
|
|
if self.enable_messaging:
|
|
await self._cleanup_messaging()
|
|
|
|
if self.database_manager:
|
|
await self._cleanup_database()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Shutdown error for {self.service_name}", error=str(e))
|
|
|
|
return lifespan
|
|
|
|
async def on_startup(self, app: FastAPI):
|
|
"""
|
|
Override this method for custom startup logic
|
|
Called after database initialization but before marking as ready
|
|
"""
|
|
pass
|
|
|
|
async def on_shutdown(self, app: FastAPI):
|
|
"""
|
|
Override this method for custom shutdown logic
|
|
Called before database cleanup
|
|
"""
|
|
pass
|
|
|
|
async def _initialize_database(self):
|
|
"""Initialize database connection and tables"""
|
|
try:
|
|
# Test connection
|
|
if await self.database_manager.test_connection():
|
|
self.logger.info("Database connection established")
|
|
|
|
# Handle automatic table initialization
|
|
await self._handle_database_tables()
|
|
|
|
self.logger.info("Database initialized successfully")
|
|
else:
|
|
raise Exception("Database connection test failed")
|
|
except Exception as e:
|
|
self.logger.error("Database initialization failed", error=str(e))
|
|
raise
|
|
|
|
async def _handle_database_tables(self):
|
|
"""
|
|
Verify database is ready for service startup.
|
|
|
|
Services NEVER run migrations - they only verify the database
|
|
has been properly initialized by the migration job.
|
|
|
|
This ensures:
|
|
- Fast service startup (50-80% faster)
|
|
- No race conditions between replicas
|
|
- Clear separation: migrations are operational, not application concern
|
|
"""
|
|
try:
|
|
# Import the init manager here to avoid circular imports
|
|
from shared.database.init_manager import initialize_service_database
|
|
|
|
# Services ALWAYS verify only (never run migrations)
|
|
# Migrations are handled by dedicated migration jobs
|
|
result = await initialize_service_database(
|
|
database_manager=self.database_manager,
|
|
service_name=self.service_name.replace("-service", "").replace("_", ""),
|
|
verify_only=True # Services only verify, never run migrations
|
|
)
|
|
|
|
self.logger.info("Database verification completed", result=result)
|
|
|
|
except Exception as e:
|
|
self.logger.error("Database verification failed", error=str(e))
|
|
# FAIL FAST: If database not ready, service should not start
|
|
raise
|
|
|
|
async def _cleanup_database(self):
|
|
"""Cleanup database connections"""
|
|
try:
|
|
await self.database_manager.close_connections()
|
|
self.logger.info("Database connections closed")
|
|
except Exception as e:
|
|
self.logger.error("Database cleanup error", error=str(e))
|
|
|
|
async def _setup_messaging(self):
|
|
"""Setup messaging service - to be overridden by services that need it"""
|
|
pass
|
|
|
|
async def _cleanup_messaging(self):
|
|
"""Cleanup messaging service - to be overridden by services that need it"""
|
|
pass
|
|
|
|
async def _initialize_alert_service(self, app: FastAPI):
|
|
"""Initialize alert service - to be overridden by services that need it"""
|
|
pass
|
|
|
|
async def _cleanup_alert_service(self):
|
|
"""Cleanup alert service - to be overridden by services that need it"""
|
|
pass
|
|
|
|
def _setup_cors(self):
|
|
"""Setup CORS middleware"""
|
|
if self.app is not None:
|
|
self.app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=self.cors_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def _setup_exception_handlers(self):
|
|
"""Setup standard exception handlers"""
|
|
if self.app is not None:
|
|
@self.app.exception_handler(ValueError)
|
|
async def value_error_handler(request: Request, exc: ValueError):
|
|
"""Handle validation errors"""
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"error": "Validation Error",
|
|
"detail": str(exc),
|
|
"type": "value_error"
|
|
}
|
|
)
|
|
|
|
@self.app.exception_handler(Exception)
|
|
async def general_exception_handler(request: Request, exc: Exception):
|
|
"""Handle general exceptions"""
|
|
print(f"DEBUG_PRINT: Base service caught unhandled exception: {str(exc)} on path {request.url.path}")
|
|
self.logger.critical(
|
|
"Unhandled exception",
|
|
error=str(exc),
|
|
path=request.url.path,
|
|
method=request.method,
|
|
exc_info=True
|
|
)
|
|
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "Internal Server Error",
|
|
"detail": f"An unexpected error occurred: {str(exc)}",
|
|
"type": "internal_error"
|
|
}
|
|
)
|
|
|
|
def _setup_health_checks(self):
|
|
"""Setup health check endpoints"""
|
|
self.health_manager = setup_fastapi_health_checks(
|
|
app=self.app,
|
|
service_name=self.service_name,
|
|
version=self.version,
|
|
database_manager=self.database_manager,
|
|
expected_tables=self.expected_tables,
|
|
custom_checks=self.custom_health_checks
|
|
)
|
|
|
|
def _setup_root_endpoint(self):
|
|
"""Setup root endpoint with service information"""
|
|
if self.app is not None:
|
|
@self.app.get("/")
|
|
async def root():
|
|
"""Root endpoint with service information"""
|
|
return {
|
|
"service": self.service_name,
|
|
"version": self.version,
|
|
"description": self.description,
|
|
"status": "running",
|
|
"docs_url": f"{self.api_prefix}/docs",
|
|
"health_url": "/health"
|
|
}
|
|
|
|
def add_router(self, router: APIRouter, **kwargs: Any):
|
|
"""Convenience method to add routers with default prefix"""
|
|
if self.app is not None:
|
|
prefix = kwargs.get('prefix', self.api_prefix)
|
|
kwargs['prefix'] = prefix
|
|
self.app.include_router(router, **kwargs)
|
|
|
|
def register_custom_metrics(self, metrics_config: Dict[str, Dict[str, Any]]):
|
|
"""
|
|
Register custom OTEL metrics for the service.
|
|
|
|
Note: System metrics (CPU, memory, disk, network) and application metrics (HTTP, DB)
|
|
are automatically created by setup_telemetry(). Use this for additional custom metrics.
|
|
|
|
Args:
|
|
metrics_config: Dict with metric name as key and config as value
|
|
Example: {
|
|
"user_registrations": {
|
|
"type": "counter",
|
|
"description": "Total user registrations",
|
|
"unit": "registrations"
|
|
}
|
|
}
|
|
"""
|
|
if not self.telemetry_providers or not self.telemetry_providers.meter_provider:
|
|
self.logger.warning("OTEL meter provider not available - metrics not registered")
|
|
return
|
|
|
|
from opentelemetry.metrics import get_meter
|
|
meter = get_meter(self.service_name)
|
|
|
|
for metric_name, config in metrics_config.items():
|
|
metric_type = config.get("type", "counter")
|
|
description = config.get("description", f"{metric_name} metric")
|
|
unit = config.get("unit", "1")
|
|
|
|
try:
|
|
if metric_type == "counter":
|
|
meter.create_counter(metric_name, description=description, unit=unit)
|
|
self.logger.info(f"Registered custom counter: {metric_name}")
|
|
elif metric_type == "histogram":
|
|
meter.create_histogram(metric_name, description=description, unit=unit)
|
|
self.logger.info(f"Registered custom histogram: {metric_name}")
|
|
elif metric_type == "gauge":
|
|
meter.create_up_down_counter(metric_name, description=description, unit=unit)
|
|
self.logger.info(f"Registered custom gauge: {metric_name}")
|
|
else:
|
|
self.logger.warning(f"Unsupported metric type: {metric_type}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to register metric {metric_name}", error=str(e))
|
|
|
|
def run_development_server(self, host: str = "0.0.0.0", port: int = 8000, reload: Optional[bool] = None):
|
|
"""
|
|
Run development server with uvicorn
|
|
"""
|
|
import uvicorn
|
|
|
|
if reload is None:
|
|
reload = os.getenv("RELOAD", "false").lower() == "true"
|
|
|
|
uvicorn.run(
|
|
f"{self.__module__}:app",
|
|
host=host,
|
|
port=port,
|
|
reload=reload,
|
|
log_level="info"
|
|
)
|
|
|
|
|
|
class StandardFastAPIService(BaseFastAPIService):
|
|
"""
|
|
Standard service implementation for most microservices
|
|
|
|
Provides additional common patterns for services with database and standard endpoints
|
|
"""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
|
|
def setup_standard_endpoints(self):
|
|
"""Setup standard service endpoints"""
|
|
if self.app is not None:
|
|
@self.app.get(f"{self.api_prefix}/info")
|
|
async def service_info():
|
|
"""Service information endpoint"""
|
|
return {
|
|
"service": self.service_name,
|
|
"version": self.version,
|
|
"description": self.description,
|
|
"api_version": "v1",
|
|
"environment": os.getenv("ENVIRONMENT", "development"),
|
|
"features": self.get_service_features()
|
|
}
|
|
|
|
def get_service_features(self) -> List[str]:
|
|
"""
|
|
Override this method to return service-specific features
|
|
"""
|
|
return ["health_checks", "metrics", "standardized_api"]
|
|
|
|
|
|
class MessageProcessorService(BaseFastAPIService):
|
|
"""
|
|
Service implementation for message processing services
|
|
|
|
Provides patterns for background processing services
|
|
"""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
# Message processors typically don't need CORS or full API setup
|
|
kwargs.setdefault('enable_cors', False)
|
|
kwargs.setdefault('api_prefix', '/api/v1')
|
|
super().__init__(**kwargs)
|
|
|
|
async def on_startup(self, app: FastAPI):
|
|
"""Initialize message processing components"""
|
|
await super().on_startup(app)
|
|
await self.setup_message_processing()
|
|
|
|
async def on_shutdown(self, app: FastAPI):
|
|
"""Cleanup message processing components"""
|
|
await self.cleanup_message_processing()
|
|
await super().on_shutdown(app)
|
|
|
|
async def setup_message_processing(self):
|
|
"""Override this method to setup message processing"""
|
|
pass
|
|
|
|
async def cleanup_message_processing(self):
|
|
"""Override this method to cleanup message processing"""
|
|
pass |