440 lines
16 KiB
Python
Executable File
440 lines
16 KiB
Python
Executable File
"""
|
|
Enhanced Health Check System for Microservices
|
|
|
|
Provides unified health check endpoints and database verification based on
|
|
the comprehensive implementation from the training service.
|
|
"""
|
|
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
from contextlib import asynccontextmanager
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import text, inspect
|
|
from fastapi import HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
import structlog
|
|
import time
|
|
import datetime
|
|
|
|
from ..database.base import DatabaseManager
|
|
from ..database.exceptions import DatabaseError, HealthCheckError
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class HealthCheckManager:
|
|
"""
|
|
Unified health check manager for microservices
|
|
|
|
Provides standardized health check endpoints:
|
|
- /health - Basic service health
|
|
- /health/ready - Kubernetes readiness probe with comprehensive checks
|
|
- /health/live - Kubernetes liveness probe
|
|
- /health/database - Detailed database health information
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
service_name: str,
|
|
version: str = "1.0.0",
|
|
database_manager: Optional[DatabaseManager] = None,
|
|
expected_tables: Optional[List[str]] = None,
|
|
custom_checks: Optional[Dict[str, Callable]] = None
|
|
):
|
|
self.service_name = service_name
|
|
self.version = version
|
|
self.database_manager = database_manager
|
|
self.expected_tables = expected_tables or []
|
|
self.custom_checks = custom_checks or {}
|
|
self.ready_state = False
|
|
|
|
def set_ready(self, ready: bool = True):
|
|
"""Set service ready state"""
|
|
self.ready_state = ready
|
|
logger.info(f"Service ready state changed",
|
|
service=self.service_name, ready=ready)
|
|
|
|
async def basic_health_check(self, app_state=None) -> Dict[str, Any]:
|
|
"""Basic health check endpoint (/health)"""
|
|
# Check app state for ready status if available
|
|
ready = self.ready_state
|
|
if app_state and hasattr(app_state, 'ready'):
|
|
ready = app_state.ready
|
|
|
|
return {
|
|
"status": "healthy" if ready else "starting",
|
|
"service": self.service_name,
|
|
"version": self.version,
|
|
"timestamp": datetime.datetime.utcnow().isoformat()
|
|
}
|
|
|
|
async def readiness_check(self, app_state=None) -> Dict[str, Any]:
|
|
"""
|
|
Kubernetes readiness probe endpoint (/health/ready)
|
|
|
|
Returns 200 if ready, 503 if not ready
|
|
"""
|
|
try:
|
|
# Check app state for ready status if available
|
|
ready = self.ready_state
|
|
if app_state and hasattr(app_state, 'ready'):
|
|
ready = app_state.ready
|
|
|
|
checks = {
|
|
"application": ready
|
|
}
|
|
|
|
database_details = {}
|
|
|
|
# Database connectivity and table verification
|
|
if self.database_manager:
|
|
db_health = await self._get_comprehensive_db_health()
|
|
checks["database_connectivity"] = db_health["connectivity"]
|
|
checks["database_tables"] = db_health["tables_exist"]
|
|
|
|
database_details = {
|
|
"status": db_health["status"],
|
|
"tables_verified": db_health["tables_verified"],
|
|
"missing_tables": db_health["missing_tables"],
|
|
"errors": db_health["errors"]
|
|
}
|
|
|
|
# Execute custom checks
|
|
for check_name, check_func in self.custom_checks.items():
|
|
try:
|
|
checks[check_name] = await check_func()
|
|
except Exception as e:
|
|
checks[check_name] = False
|
|
logger.error(f"Custom check '{check_name}' failed", error=str(e))
|
|
|
|
# Service is ready only if all checks pass
|
|
all_ready = all(checks.values())
|
|
if self.database_manager:
|
|
all_ready = all_ready and database_details.get("status") == "healthy"
|
|
|
|
response_data = {
|
|
"status": "ready" if all_ready else "not ready",
|
|
"checks": checks
|
|
}
|
|
|
|
if database_details:
|
|
response_data["database"] = database_details
|
|
|
|
if all_ready:
|
|
return response_data
|
|
else:
|
|
raise HTTPException(status_code=503, detail=response_data)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Readiness check failed", error=str(e))
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={
|
|
"status": "not ready",
|
|
"error": f"Health check failed: {str(e)}"
|
|
}
|
|
)
|
|
|
|
async def liveness_check(self) -> Dict[str, Any]:
|
|
"""Kubernetes liveness probe endpoint (/health/live)"""
|
|
return {"status": "alive"}
|
|
|
|
async def database_health_check(self) -> Dict[str, Any]:
|
|
"""
|
|
Detailed database health endpoint (/health/database)
|
|
|
|
Returns 200 if healthy, 503 if unhealthy
|
|
"""
|
|
if not self.database_manager:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail={"error": "Database health check not available"}
|
|
)
|
|
|
|
try:
|
|
db_health = await self._get_comprehensive_db_health()
|
|
status_code = 200 if db_health["status"] == "healthy" else 503
|
|
|
|
if status_code == 503:
|
|
raise HTTPException(status_code=503, detail=db_health)
|
|
return db_health
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Database health check failed", error=str(e))
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={
|
|
"status": "unhealthy",
|
|
"error": f"Health check failed: {str(e)}"
|
|
}
|
|
)
|
|
|
|
async def _get_comprehensive_db_health(self) -> Dict[str, Any]:
|
|
"""
|
|
Comprehensive database health check with table verification
|
|
Based on training service implementation
|
|
"""
|
|
health_status = {
|
|
"status": "healthy",
|
|
"connectivity": False,
|
|
"tables_exist": False,
|
|
"tables_verified": [],
|
|
"missing_tables": [],
|
|
"errors": [],
|
|
"connection_info": {},
|
|
"response_time_ms": 0
|
|
}
|
|
|
|
if not self.database_manager:
|
|
health_status["status"] = "unhealthy"
|
|
health_status["errors"].append("Database manager not configured")
|
|
return health_status
|
|
|
|
try:
|
|
# Test basic connectivity with timing
|
|
start_time = time.time()
|
|
health_status["connectivity"] = await self.database_manager.test_connection()
|
|
response_time = (time.time() - start_time) * 1000
|
|
health_status["response_time_ms"] = round(response_time, 2)
|
|
|
|
if not health_status["connectivity"]:
|
|
health_status["status"] = "unhealthy"
|
|
health_status["errors"].append("Database connectivity failed")
|
|
return health_status
|
|
|
|
# Get connection pool information
|
|
health_status["connection_info"] = await self.database_manager.get_connection_info()
|
|
|
|
# Check migration status
|
|
migration_status = await self._check_migration_status()
|
|
health_status.update(migration_status)
|
|
|
|
# Test table existence if expected tables are configured
|
|
if self.expected_tables:
|
|
tables_verified = await self._verify_tables_exist()
|
|
health_status["tables_exist"] = tables_verified
|
|
|
|
if tables_verified:
|
|
health_status["tables_verified"] = self.expected_tables.copy()
|
|
else:
|
|
health_status["status"] = "unhealthy"
|
|
health_status["errors"].append("Required tables missing or inaccessible")
|
|
|
|
# Identify which specific tables are missing
|
|
await self._identify_missing_tables(health_status)
|
|
else:
|
|
# If no expected tables configured, just mark as verified
|
|
health_status["tables_exist"] = True
|
|
|
|
logger.debug("Comprehensive database health check completed",
|
|
service=self.service_name,
|
|
status=health_status["status"],
|
|
connectivity=health_status["connectivity"],
|
|
tables_exist=health_status["tables_exist"])
|
|
|
|
except Exception as e:
|
|
health_status["status"] = "unhealthy"
|
|
health_status["errors"].append(f"Health check failed: {str(e)}")
|
|
logger.error("Comprehensive database health check failed",
|
|
service=self.service_name, error=str(e))
|
|
|
|
return health_status
|
|
|
|
async def _verify_tables_exist(self) -> bool:
|
|
"""Verify that all expected tables exist and are accessible"""
|
|
try:
|
|
async with self.database_manager.get_session() as session:
|
|
for table_name in self.expected_tables:
|
|
try:
|
|
await session.execute(text(f"SELECT 1 FROM {table_name} LIMIT 1"))
|
|
except Exception:
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Table verification failed", error=str(e))
|
|
return False
|
|
|
|
async def _identify_missing_tables(self, health_status: Dict[str, Any]):
|
|
"""Identify which specific tables are missing"""
|
|
try:
|
|
async with self.database_manager.get_session() as session:
|
|
for table_name in self.expected_tables:
|
|
try:
|
|
await session.execute(text(f"SELECT 1 FROM {table_name} LIMIT 1"))
|
|
health_status["tables_verified"].append(table_name)
|
|
except Exception:
|
|
health_status["missing_tables"].append(table_name)
|
|
except Exception as e:
|
|
health_status["errors"].append(f"Error checking individual tables: {str(e)}")
|
|
|
|
async def _check_migration_status(self) -> Dict[str, Any]:
|
|
"""Check database migration status"""
|
|
migration_info = {
|
|
"migration_version": None,
|
|
"migration_status": "unknown",
|
|
"migration_errors": []
|
|
}
|
|
|
|
try:
|
|
async with self.database_manager.get_session() as session:
|
|
# Check if alembic_version table exists
|
|
result = await session.execute(
|
|
text("SELECT version_num FROM alembic_version LIMIT 1")
|
|
)
|
|
version = result.scalar()
|
|
|
|
if version:
|
|
migration_info["migration_version"] = version
|
|
migration_info["migration_status"] = "healthy"
|
|
logger.debug(f"Migration version found: {version}", service=self.service_name)
|
|
else:
|
|
migration_info["migration_status"] = "no_version"
|
|
migration_info["migration_errors"].append("No migration version found in alembic_version table")
|
|
|
|
except Exception as e:
|
|
migration_info["migration_status"] = "error"
|
|
migration_info["migration_errors"].append(f"Migration check failed: {str(e)}")
|
|
logger.error("Migration status check failed", service=self.service_name, error=str(e))
|
|
|
|
return migration_info
|
|
|
|
|
|
class FastAPIHealthChecker:
|
|
"""
|
|
FastAPI integration for health checks
|
|
|
|
Provides router setup and endpoint registration
|
|
"""
|
|
|
|
def __init__(self, health_manager: HealthCheckManager):
|
|
self.health_manager = health_manager
|
|
|
|
def setup_health_routes(self, app):
|
|
"""Setup health check routes on FastAPI app"""
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Basic health check endpoint"""
|
|
return await self.health_manager.basic_health_check(app.state)
|
|
|
|
@app.get("/health/ready")
|
|
async def readiness_check():
|
|
"""Kubernetes readiness probe endpoint"""
|
|
try:
|
|
return await self.health_manager.readiness_check(app.state)
|
|
except HTTPException as e:
|
|
return JSONResponse(
|
|
status_code=e.status_code,
|
|
content=e.detail
|
|
)
|
|
|
|
@app.get("/health/live")
|
|
async def liveness_check():
|
|
"""Kubernetes liveness probe endpoint"""
|
|
return await self.health_manager.liveness_check()
|
|
|
|
@app.get("/health/database")
|
|
async def database_health_check():
|
|
"""Detailed database health endpoint"""
|
|
try:
|
|
return await self.health_manager.database_health_check()
|
|
except HTTPException as e:
|
|
return JSONResponse(
|
|
status_code=e.status_code,
|
|
content=e.detail
|
|
)
|
|
|
|
|
|
# Convenience functions for easy integration
|
|
|
|
async def check_database_health(db_manager: DatabaseManager) -> Dict[str, Any]:
|
|
"""
|
|
Enhanced database health check with migration status
|
|
|
|
Args:
|
|
db_manager: DatabaseManager instance
|
|
|
|
Returns:
|
|
Dict containing database health status including migration version
|
|
"""
|
|
try:
|
|
async with db_manager.get_session() as session:
|
|
# Basic connectivity test
|
|
await session.execute(text("SELECT 1"))
|
|
|
|
# Get migration status
|
|
migration_status = await session.execute(text("SELECT version_num FROM alembic_version"))
|
|
version = migration_status.scalar()
|
|
|
|
return {
|
|
"database": "healthy",
|
|
"migration_version": version,
|
|
"connectivity": True
|
|
}
|
|
except Exception as e:
|
|
logger.error("Database health check failed", error=str(e))
|
|
return {
|
|
"database": "unhealthy",
|
|
"error": str(e),
|
|
"connectivity": False,
|
|
"migration_version": None
|
|
}
|
|
|
|
|
|
def create_health_manager(
|
|
service_name: str,
|
|
version: str = "1.0.0",
|
|
database_manager: Optional[DatabaseManager] = None,
|
|
expected_tables: Optional[List[str]] = None,
|
|
custom_checks: Optional[Dict[str, Callable]] = None
|
|
) -> HealthCheckManager:
|
|
"""Factory function to create a HealthCheckManager"""
|
|
return HealthCheckManager(
|
|
service_name=service_name,
|
|
version=version,
|
|
database_manager=database_manager,
|
|
expected_tables=expected_tables,
|
|
custom_checks=custom_checks
|
|
)
|
|
|
|
|
|
def setup_fastapi_health_checks(
|
|
app,
|
|
service_name: str,
|
|
version: str = "1.0.0",
|
|
database_manager: Optional[DatabaseManager] = None,
|
|
expected_tables: Optional[List[str]] = None,
|
|
custom_checks: Optional[Dict[str, Callable]] = None
|
|
) -> HealthCheckManager:
|
|
"""
|
|
Convenience function to setup health checks on a FastAPI app
|
|
|
|
Args:
|
|
app: FastAPI application instance
|
|
service_name: Name of the service
|
|
version: Service version
|
|
database_manager: Database manager instance
|
|
expected_tables: List of tables that should exist
|
|
custom_checks: Dict of custom check functions
|
|
|
|
Returns:
|
|
HealthCheckManager instance for further configuration
|
|
"""
|
|
health_manager = create_health_manager(
|
|
service_name=service_name,
|
|
version=version,
|
|
database_manager=database_manager,
|
|
expected_tables=expected_tables,
|
|
custom_checks=custom_checks
|
|
)
|
|
|
|
fastapi_checker = FastAPIHealthChecker(health_manager)
|
|
fastapi_checker.setup_health_routes(app)
|
|
|
|
return health_manager
|
|
|
|
|