From e989e3b36277eee42e7da525219cd947a3e9818c Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 18 Jul 2025 12:34:28 +0200 Subject: [PATCH] Fix shared issues --- services/auth/app/api/auth.py | 146 ++++-- services/auth/app/main.py | 184 ++++---- services/data/app/main.py | 126 ++++-- .../{sales_service.py => sales_services.py} | 0 shared/monitoring/__init__.py | 15 + shared/monitoring/decorators.py | 89 ++++ shared/monitoring/health.py | 162 +++++++ shared/monitoring/logging.py | 161 +++++-- shared/monitoring/metrics.py | 416 +++++++++--------- 9 files changed, 913 insertions(+), 386 deletions(-) rename services/data/app/services/{sales_service.py => sales_services.py} (100%) create mode 100644 shared/monitoring/decorators.py create mode 100644 shared/monitoring/health.py diff --git a/services/auth/app/api/auth.py b/services/auth/app/api/auth.py index 2cca1206..75885476 100644 --- a/services/auth/app/api/auth.py +++ b/services/auth/app/api/auth.py @@ -1,8 +1,8 @@ # ================================================================ -# services/auth/app/api/auth.py (ENHANCED VERSION) +# services/auth/app/api/auth.py - Updated with modular monitoring # ================================================================ """ -Authentication API routes - Enhanced with proper error handling and logging +Authentication API routes - Enhanced with proper metrics access """ from fastapi import APIRouter, Depends, HTTPException, status, Request @@ -16,26 +16,46 @@ from app.schemas.auth import ( ) from app.services.auth_service import AuthService from app.core.security import security_manager -from shared.monitoring.metrics import MetricsCollector +from shared.monitoring.decorators import track_execution_time, count_calls logger = logging.getLogger(__name__) router = APIRouter() -metrics = MetricsCollector("auth_service") + +def get_metrics_collector(request: Request): + """Get metrics collector from app state""" + return getattr(request.app.state, 'metrics_collector', None) @router.post("/register", response_model=UserResponse) +@track_execution_time("registration_duration_seconds", "auth-service") async def register( user_data: UserRegistration, + request: Request, db: AsyncSession = Depends(get_db) ): """Register a new user""" + metrics = get_metrics_collector(request) + try: - metrics.increment_counter("auth_registration_total") result = await AuthService.register_user(user_data, db) + + # Record successful registration + if metrics: + metrics.increment_counter("registration_total", labels={"status": "success"}) + logger.info(f"User registration successful: {user_data.email}") return result - except HTTPException: + + except HTTPException as e: + # Record failed registration + if metrics: + metrics.increment_counter("registration_total", labels={"status": "failed"}) + logger.warning(f"Registration failed for {user_data.email}: {e.detail}") raise + except Exception as e: + # Record error + if metrics: + metrics.increment_counter("registration_total", labels={"status": "error"}) logger.error(f"Registration error for {user_data.email}: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -43,25 +63,39 @@ async def register( ) @router.post("/login", response_model=TokenResponse) +@track_execution_time("login_duration_seconds", "auth-service") async def login( login_data: UserLogin, request: Request, db: AsyncSession = Depends(get_db) ): """User login""" + metrics = get_metrics_collector(request) + try: ip_address = request.client.host user_agent = request.headers.get("user-agent", "") result = await AuthService.login_user(login_data, db, ip_address, user_agent) - metrics.increment_counter("auth_login_success_total") + + # Record successful login + if metrics: + metrics.increment_counter("login_success_total") + + logger.info(f"Login successful for {login_data.email}") return result + except HTTPException as e: - metrics.increment_counter("auth_login_failure_total") + # Record failed login + if metrics: + metrics.increment_counter("login_failure_total", labels={"reason": "auth_failed"}) logger.warning(f"Login failed for {login_data.email}: {e.detail}") raise + except Exception as e: - metrics.increment_counter("auth_login_failure_total") + # Record login error + if metrics: + metrics.increment_counter("login_failure_total", labels={"reason": "error"}) logger.error(f"Login error for {login_data.email}: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -69,16 +103,34 @@ async def login( ) @router.post("/refresh", response_model=TokenResponse) +@track_execution_time("token_refresh_duration_seconds", "auth-service") async def refresh_token( refresh_data: RefreshTokenRequest, + request: Request, db: AsyncSession = Depends(get_db) ): """Refresh access token""" + metrics = get_metrics_collector(request) + try: - return await AuthService.refresh_token(refresh_data.refresh_token, db) - except HTTPException: + result = await AuthService.refresh_token(refresh_data.refresh_token, db) + + # Record successful refresh + if metrics: + metrics.increment_counter("token_refresh_total", labels={"status": "success"}) + + return result + + except HTTPException as e: + # Record failed refresh + if metrics: + metrics.increment_counter("token_refresh_total", labels={"status": "failed"}) raise + except Exception as e: + # Record refresh error + if metrics: + metrics.increment_counter("token_refresh_total", labels={"status": "error"}) logger.error(f"Token refresh error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -91,27 +143,37 @@ async def verify_token( db: AsyncSession = Depends(get_db) ): """Verify access token""" + metrics = get_metrics_collector(request) + try: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): + if metrics: + metrics.increment_counter("token_verify_total", labels={"status": "no_token"}) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Authorization header required" + detail="Missing or invalid authorization header" ) token = auth_header.split(" ")[1] - token_data = await AuthService.verify_token(token) + payload = await AuthService.verify_token(token, db) - return { - "valid": True, - "user_id": token_data.get("user_id"), - "email": token_data.get("email"), - "role": token_data.get("role"), - "tenant_id": token_data.get("tenant_id") - } - except HTTPException: + # Record successful verification + if metrics: + metrics.increment_counter("token_verify_total", labels={"status": "success"}) + + return {"valid": True, "user_id": payload["sub"]} + + except HTTPException as e: + # Record failed verification + if metrics: + metrics.increment_counter("token_verify_total", labels={"status": "failed"}) raise + except Exception as e: + # Record verification error + if metrics: + metrics.increment_counter("token_verify_total", labels={"status": "error"}) logger.error(f"Token verification error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -120,27 +182,43 @@ async def verify_token( @router.post("/logout") async def logout( - refresh_data: RefreshTokenRequest, request: Request, db: AsyncSession = Depends(get_db) ): - """Logout user""" + """User logout""" + metrics = get_metrics_collector(request) + try: - # Get user from token auth_header = request.headers.get("Authorization") - if auth_header and auth_header.startswith("Bearer "): - token = auth_header.split(" ")[1] - token_data = await AuthService.verify_token(token) - user_id = token_data.get("user_id") - - if user_id: - success = await AuthService.logout_user(user_id, refresh_data.refresh_token, db) - return {"success": success} + if not auth_header or not auth_header.startswith("Bearer "): + if metrics: + metrics.increment_counter("logout_total", labels={"status": "no_token"}) + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing or invalid authorization header" + ) + + token = auth_header.split(" ")[1] + await AuthService.logout_user(token, db) + + # Record successful logout + if metrics: + metrics.increment_counter("logout_total", labels={"status": "success"}) + + return {"message": "Logged out successfully"} + + except HTTPException as e: + # Record failed logout + if metrics: + metrics.increment_counter("logout_total", labels={"status": "failed"}) + raise - return {"success": False, "message": "Invalid token"} except Exception as e: + # Record logout error + if metrics: + metrics.increment_counter("logout_total", labels={"status": "error"}) logger.error(f"Logout error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Logout failed" - ) + ) \ No newline at end of file diff --git a/services/auth/app/main.py b/services/auth/app/main.py index 38086335..e58efd3c 100644 --- a/services/auth/app/main.py +++ b/services/auth/app/main.py @@ -1,9 +1,5 @@ -# ================================================================ -# services/auth/app/main.py (ENHANCED VERSION) -# ================================================================ """ -Authentication Service Main Application -Enhanced version with proper lifecycle management and microservices integration +Authentication Service Main Application - Fixed middleware issue """ import logging @@ -16,58 +12,108 @@ from app.core.config import settings from app.core.database import engine, create_tables from app.api import auth, users from app.services.messaging import setup_messaging, cleanup_messaging -from shared.monitoring.logging import setup_logging -from shared.monitoring.metrics import MetricsCollector +from shared.monitoring import setup_logging, HealthChecker +from shared.monitoring.metrics import setup_metrics_early -# Setup logging +# Setup logging first setup_logging("auth-service", settings.LOG_LEVEL) logger = logging.getLogger(__name__) -# Initialize metrics -metrics = MetricsCollector("auth_service") +# Global variables for lifespan access +metrics_collector = None +health_checker = None -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events""" - # Startup - logger.info("Starting Authentication Service...") - - # Create database tables - await create_tables() - logger.info("Database tables created") - - # Setup messaging - await setup_messaging() - logger.info("Messaging setup complete") - - # Register metrics - metrics.register_counter("auth_requests_total", "Total authentication requests") - metrics.register_counter("auth_login_success_total", "Successful logins") - metrics.register_counter("auth_login_failure_total", "Failed logins") - metrics.register_counter("auth_registration_total", "User registrations") - metrics.register_histogram("auth_request_duration_seconds", "Request duration") - - logger.info("Authentication Service started successfully") - - yield - - # Shutdown - logger.info("Shutting down Authentication Service...") - await cleanup_messaging() - await engine.dispose() - logger.info("Authentication Service shutdown complete") - -# Create FastAPI app +# Create FastAPI app FIRST app = FastAPI( title="Authentication Service", description="Handles user authentication and authorization for bakery forecasting platform", version="1.0.0", docs_url="/docs", - redoc_url="/redoc", - lifespan=lifespan + redoc_url="/redoc" ) -# CORS middleware +# Setup metrics BEFORE any middleware and BEFORE lifespan +# This must happen before the app starts +metrics_collector = setup_metrics_early(app, "auth-service") + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan events - NO MIDDLEWARE ADDED HERE""" + global health_checker + + # Startup + logger.info("Starting Authentication Service...") + + try: + # Create database tables + await create_tables() + logger.info("Database tables created") + + # Setup messaging + await setup_messaging() + logger.info("Messaging setup complete") + + # Register custom metrics (metrics_collector already exists) + metrics_collector.register_counter("registration_total", "Total user registrations") + metrics_collector.register_counter("login_success_total", "Successful logins") + metrics_collector.register_counter("login_failure_total", "Failed logins") + metrics_collector.register_counter("token_refresh_total", "Token refresh requests") + metrics_collector.register_counter("token_verify_total", "Token verification requests") + metrics_collector.register_counter("logout_total", "User logout requests") + metrics_collector.register_counter("errors_total", "Total errors") + metrics_collector.register_histogram("registration_duration_seconds", "Registration request duration") + metrics_collector.register_histogram("login_duration_seconds", "Login request duration") + metrics_collector.register_histogram("token_refresh_duration_seconds", "Token refresh duration") + + # Setup health checker + health_checker = HealthChecker("auth-service") + + # Add database health check + async def check_database(): + try: + from app.core.database import get_db + async for db in get_db(): + await db.execute("SELECT 1") + return True + except Exception as e: + return f"Database error: {e}" + + health_checker.add_check("database", check_database, timeout=5.0, critical=True) + + # Add messaging health check + def check_messaging(): + try: + # Add your messaging health check logic here + return True + except Exception as e: + return f"Messaging error: {e}" + + health_checker.add_check("messaging", check_messaging, timeout=3.0, critical=False) + + # Store health checker in app state + app.state.health_checker = health_checker + + logger.info("Authentication Service started successfully") + + except Exception as e: + logger.error(f"Failed to start Authentication Service: {e}") + raise + + yield + + # Shutdown + logger.info("Shutting down Authentication Service...") + try: + await cleanup_messaging() + await engine.dispose() + logger.info("Authentication Service shutdown complete") + except Exception as e: + logger.error(f"Error during shutdown: {e}") + +# Set lifespan AFTER metrics setup +app.router.lifespan_context = lifespan + +# CORS middleware (added after metrics setup) app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure properly for production @@ -80,44 +126,30 @@ app.add_middleware( app.include_router(auth.router, prefix="/api/v1/auth", tags=["authentication"]) app.include_router(users.router, prefix="/api/v1/users", tags=["users"]) -# Health check endpoint +# Health check endpoint with comprehensive checks @app.get("/health") async def health_check(): - """Health check endpoint""" - return { - "service": "auth-service", - "status": "healthy", - "version": "1.0.0" - } - -# Metrics endpoint -@app.get("/metrics") -async def get_metrics(): - """Prometheus metrics endpoint""" - return metrics.get_metrics() + """Comprehensive health check endpoint""" + if health_checker: + return await health_checker.check_health() + else: + return { + "service": "auth-service", + "status": "healthy", + "version": "1.0.0" + } # Exception handlers @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler""" + """Global exception handler with metrics""" logger.error(f"Unhandled exception: {exc}", exc_info=True) + + # Record error metric if available + if metrics_collector: + metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) + return JSONResponse( status_code=500, content={"detail": "Internal server error"} - ) - -# Request middleware for metrics -@app.middleware("http") -async def metrics_middleware(request: Request, call_next): - """Middleware to collect metrics""" - import time - start_time = time.time() - - response = await call_next(request) - - # Record metrics - duration = time.time() - start_time - metrics.observe_histogram("auth_request_duration_seconds", duration) - metrics.increment_counter("auth_requests_total") - - return response + ) \ No newline at end of file diff --git a/services/data/app/main.py b/services/data/app/main.py index 73dbea8a..aa9c18e7 100644 --- a/services/data/app/main.py +++ b/services/data/app/main.py @@ -1,6 +1,8 @@ +# ================================================================ +# services/data/app/main.py - FIXED VERSION +# ================================================================ """ -Data Service Main Application -Handles external API integrations (weather, traffic, events) +Data Service Main Application - Fixed middleware issue """ import logging @@ -8,45 +10,99 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse -import structlog from app.core.config import settings from app.core.database import init_db from app.api.sales import router as sales_router from app.api.weather import router as weather_router from app.api.traffic import router as traffic_router -from shared.monitoring.metrics import setup_metrics -from shared.monitoring.logging import setup_logging +from shared.monitoring import setup_logging, HealthChecker +from shared.monitoring.metrics import setup_metrics_early -# Setup logging +# Setup logging first setup_logging("data-service", settings.LOG_LEVEL) -logger = structlog.get_logger() +logger = logging.getLogger(__name__) -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events""" - # Startup - logger.info("Starting Data Service") - await init_db() - yield - # Shutdown - logger.info("Shutting down Data Service") +# Global variables for lifespan access +metrics_collector = None +health_checker = None -# Create FastAPI app +# Create FastAPI app FIRST app = FastAPI( title="Bakery Data Service", description="External data integration service for weather, traffic, and sales data", - version="1.0.0", - lifespan=lifespan + version="1.0.0" ) -# Setup metrics -setup_metrics(app) +# Setup metrics BEFORE any middleware and BEFORE lifespan +metrics_collector = setup_metrics_early(app, "data-service") -# CORS middleware +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan events - NO MIDDLEWARE ADDED HERE""" + global health_checker + + # Startup + logger.info("Starting Data Service...") + + try: + # Initialize database + await init_db() + logger.info("Database initialized") + + # Register custom metrics (metrics_collector already exists) + metrics_collector.register_counter("sales_records_created_total", "Total sales records created") + metrics_collector.register_counter("sales_queries_total", "Sales record queries") + metrics_collector.register_counter("weather_api_calls_total", "Weather API calls") + metrics_collector.register_counter("weather_api_success_total", "Successful weather API calls") + metrics_collector.register_counter("weather_api_failures_total", "Failed weather API calls") + metrics_collector.register_counter("traffic_api_calls_total", "Traffic API calls") + metrics_collector.register_counter("import_jobs_total", "Data import jobs") + metrics_collector.register_counter("template_downloads_total", "Template downloads") + metrics_collector.register_counter("errors_total", "Total errors") + metrics_collector.register_histogram("sales_create_duration_seconds", "Sales record creation duration") + metrics_collector.register_histogram("sales_list_duration_seconds", "Sales record list duration") + metrics_collector.register_histogram("import_duration_seconds", "Data import duration") + metrics_collector.register_histogram("weather_current_duration_seconds", "Current weather API duration") + metrics_collector.register_histogram("weather_forecast_duration_seconds", "Weather forecast API duration") + metrics_collector.register_histogram("external_api_duration_seconds", "External API call duration") + + # Setup health checker + health_checker = HealthChecker("data-service") + + # Add database health check + async def check_database(): + try: + from app.core.database import get_db + async for db in get_db(): + await db.execute("SELECT 1") + return True + except Exception as e: + return f"Database error: {e}" + + health_checker.add_check("database", check_database, timeout=5.0, critical=True) + + # Store health checker in app state + app.state.health_checker = health_checker + + logger.info("Data Service started successfully") + + except Exception as e: + logger.error(f"Failed to start Data Service: {e}") + raise + + yield + + # Shutdown + logger.info("Shutting down Data Service...") + +# Set lifespan AFTER metrics setup +app.router.lifespan_context = lifespan + +# CORS middleware (added after metrics setup) app.add_middleware( CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, + allow_origins=getattr(settings, 'CORS_ORIGINS', ["*"]), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -57,16 +113,30 @@ app.include_router(sales_router, prefix="/api/v1/sales", tags=["sales"]) app.include_router(weather_router, prefix="/api/v1/weather", tags=["weather"]) app.include_router(traffic_router, prefix="/api/v1/traffic", tags=["traffic"]) +# Health check endpoint @app.get("/health") async def health_check(): - """Health check endpoint""" - return {"status": "healthy", "service": "data-service"} + """Comprehensive health check endpoint""" + if health_checker: + return await health_checker.check_health() + else: + return { + "service": "data-service", + "status": "healthy", + "version": "1.0.0" + } +# Exception handlers @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler""" - logger.error("Unhandled exception", exc_info=exc, path=request.url.path) + """Global exception handler with metrics""" + logger.error(f"Unhandled exception: {exc}", exc_info=True) + + # Record error metric if available + if metrics_collector: + metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) + return JSONResponse( status_code=500, content={"detail": "Internal server error"} - ) + ) \ No newline at end of file diff --git a/services/data/app/services/sales_service.py b/services/data/app/services/sales_services.py similarity index 100% rename from services/data/app/services/sales_service.py rename to services/data/app/services/sales_services.py diff --git a/shared/monitoring/__init__.py b/shared/monitoring/__init__.py index e69de29b..7128a9f0 100644 --- a/shared/monitoring/__init__.py +++ b/shared/monitoring/__init__.py @@ -0,0 +1,15 @@ +""" +Shared monitoring package for microservices +""" + +from .logging import setup_logging +from .metrics import setup_metrics_early, get_metrics_collector, MetricsCollector +from .health import HealthChecker + +__all__ = [ + 'setup_logging', + 'setup_metrics_early', + 'get_metrics_collector', + 'MetricsCollector', + 'HealthChecker' +] \ No newline at end of file diff --git a/shared/monitoring/decorators.py b/shared/monitoring/decorators.py new file mode 100644 index 00000000..7a9f3975 --- /dev/null +++ b/shared/monitoring/decorators.py @@ -0,0 +1,89 @@ +# ================================================================ +# shared/monitoring/decorators.py +# ================================================================ +""" +Decorators for monitoring and metrics +""" + +import time +import logging +import functools +from typing import Callable, Any, Optional +from .metrics import get_metrics_collector + +logger = logging.getLogger(__name__) + +def track_execution_time(metric_name: str, service_name: str, + labels: Optional[dict] = None): + """Decorator to track function execution time""" + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def async_wrapper(*args, **kwargs) -> Any: + start_time = time.time() + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + + metrics_collector = get_metrics_collector(service_name) + if metrics_collector: + metrics_collector.observe_histogram(metric_name, duration, labels) + + return result + except Exception as e: + duration = time.time() - start_time + logger.error(f"Function {func.__name__} failed after {duration:.2f}s: {e}") + raise + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs) -> Any: + start_time = time.time() + try: + result = func(*args, **kwargs) + duration = time.time() - start_time + + metrics_collector = get_metrics_collector(service_name) + if metrics_collector: + metrics_collector.observe_histogram(metric_name, duration, labels) + + return result + except Exception as e: + duration = time.time() - start_time + logger.error(f"Function {func.__name__} failed after {duration:.2f}s: {e}") + raise + + # Return appropriate wrapper based on function type + import asyncio + if asyncio.iscoroutinefunction(func): + return async_wrapper + else: + return sync_wrapper + + return decorator + + +def count_calls(metric_name: str, service_name: str, + labels: Optional[dict] = None): + """Decorator to count function calls""" + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def async_wrapper(*args, **kwargs) -> Any: + metrics_collector = get_metrics_collector(service_name) + if metrics_collector: + metrics_collector.increment_counter(metric_name, labels=labels) + return await func(*args, **kwargs) + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs) -> Any: + metrics_collector = get_metrics_collector(service_name) + if metrics_collector: + metrics_collector.increment_counter(metric_name, labels=labels) + return func(*args, **kwargs) + + # Return appropriate wrapper based on function type + import asyncio + if asyncio.iscoroutinefunction(func): + return async_wrapper + else: + return sync_wrapper + + return decorator \ No newline at end of file diff --git a/shared/monitoring/health.py b/shared/monitoring/health.py new file mode 100644 index 00000000..2a89ce95 --- /dev/null +++ b/shared/monitoring/health.py @@ -0,0 +1,162 @@ +# ================================================================ +# shared/monitoring/health.py +# ================================================================ +""" +Health check utilities for microservices +""" + +import asyncio +import logging +import time +from typing import Dict, List, Callable, Any, Optional +from dataclasses import dataclass +from enum import Enum + +logger = logging.getLogger(__name__) + +class HealthStatus(Enum): + HEALTHY = "healthy" + DEGRADED = "degraded" + UNHEALTHY = "unhealthy" + +@dataclass +class HealthCheck: + name: str + check_function: Callable[[], Any] + timeout: float = 5.0 + critical: bool = True + +@dataclass +class HealthResult: + name: str + status: HealthStatus + message: str + duration: float + timestamp: float + +class HealthChecker: + """Health checker for microservices""" + + def __init__(self, service_name: str): + self.service_name = service_name + self.checks: List[HealthCheck] = [] + self.start_time = time.time() + + def add_check(self, name: str, check_function: Callable, timeout: float = 5.0, + critical: bool = True) -> None: + """Add a health check""" + self.checks.append(HealthCheck(name, check_function, timeout, critical)) + + async def run_check(self, check: HealthCheck) -> HealthResult: + """Run a single health check""" + start_time = time.time() + + try: + # Run the check with timeout + result = await asyncio.wait_for( + asyncio.create_task(self._execute_check(check.check_function)), + timeout=check.timeout + ) + + duration = time.time() - start_time + + if result is True or (isinstance(result, dict) and result.get('healthy', False)): + return HealthResult( + name=check.name, + status=HealthStatus.HEALTHY, + message="OK", + duration=duration, + timestamp=time.time() + ) + else: + message = str(result) if result else "Check failed" + return HealthResult( + name=check.name, + status=HealthStatus.UNHEALTHY, + message=message, + duration=duration, + timestamp=time.time() + ) + + except asyncio.TimeoutError: + duration = time.time() - start_time + return HealthResult( + name=check.name, + status=HealthStatus.UNHEALTHY, + message=f"Timeout after {check.timeout}s", + duration=duration, + timestamp=time.time() + ) + except Exception as e: + duration = time.time() - start_time + return HealthResult( + name=check.name, + status=HealthStatus.UNHEALTHY, + message=f"Error: {str(e)}", + duration=duration, + timestamp=time.time() + ) + + async def _execute_check(self, check_function: Callable) -> Any: + """Execute a check function (handles both sync and async)""" + if asyncio.iscoroutinefunction(check_function): + return await check_function() + else: + return check_function() + + async def check_health(self) -> Dict[str, Any]: + """Run all health checks and return status""" + if not self.checks: + return { + "service": self.service_name, + "status": HealthStatus.HEALTHY.value, + "uptime": time.time() - self.start_time, + "timestamp": time.time(), + "checks": {} + } + + # Run all checks concurrently + results = await asyncio.gather( + *[self.run_check(check) for check in self.checks], + return_exceptions=True + ) + + # Process results + check_results = {} + overall_status = HealthStatus.HEALTHY + + for i, result in enumerate(results): + check = self.checks[i] + + if isinstance(result, Exception): + check_result = HealthResult( + name=check.name, + status=HealthStatus.UNHEALTHY, + message=f"Exception: {str(result)}", + duration=0.0, + timestamp=time.time() + ) + else: + check_result = result + + check_results[check.name] = { + "status": check_result.status.value, + "message": check_result.message, + "duration": check_result.duration, + "timestamp": check_result.timestamp + } + + # Determine overall status + if check.critical and check_result.status == HealthStatus.UNHEALTHY: + overall_status = HealthStatus.UNHEALTHY + elif check_result.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: + overall_status = HealthStatus.DEGRADED + + return { + "service": self.service_name, + "status": overall_status.value, + "uptime": time.time() - self.start_time, + "timestamp": time.time(), + "checks": check_results + } + diff --git a/shared/monitoring/logging.py b/shared/monitoring/logging.py index 0fde234d..f8850e0a 100644 --- a/shared/monitoring/logging.py +++ b/shared/monitoring/logging.py @@ -1,3 +1,6 @@ +# ================================================================ +# shared/monitoring/logging.py +# ================================================================ """ Centralized logging configuration for microservices """ @@ -5,53 +8,109 @@ Centralized logging configuration for microservices import logging import logging.config import os +import sys from typing import Dict, Any -def setup_logging(service_name: str, log_level: str = "INFO") -> None: - """Set up logging configuration for a microservice""" +def setup_logging(service_name: str, log_level: str = "INFO", + enable_json: bool = False, enable_file: bool = True) -> None: + """ + Set up logging configuration for a microservice with improved error handling. - config: Dict[str, Any] = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "standard": { - "format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - }, - "detailed": { - "format": "%(asctime)s [%(levelname)s] %(name)s [%(filename)s:%(lineno)d] %(message)s" - }, - "json": { - "()": "pythonjsonlogger.jsonlogger.JsonFormatter", - "format": "%(asctime)s %(name)s %(levelname)s %(message)s" - } + Args: + service_name: Name of the service for log identification + log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + enable_json: Whether to use JSON formatting + enable_file: Whether to enable file logging + """ + + # Create logs directory if it doesn't exist and file logging is enabled + log_dir = "/var/log" + if enable_file: + try: + os.makedirs(log_dir, exist_ok=True) + except PermissionError: + # Fallback to local directory if can't write to /var/log + log_dir = "./logs" + os.makedirs(log_dir, exist_ok=True) + print(f"Warning: Could not write to /var/log, using {log_dir}") + + # Define formatters + formatters = { + "standard": { + "format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S" }, - "handlers": { - "console": { - "class": "logging.StreamHandler", - "level": log_level, - "formatter": "standard", - "stream": "ext://sys.stdout" - }, - "file": { - "class": "logging.FileHandler", - "level": log_level, - "formatter": "detailed", - "filename": f"/var/log/{service_name}.log", - "mode": "a" - }, - "logstash": { + "detailed": { + "format": "%(asctime)s [%(levelname)s] %(name)s [%(filename)s:%(lineno)d] %(funcName)s(): %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S" + } + } + + # Add JSON formatter if requested and available + if enable_json: + try: + import pythonjsonlogger.jsonlogger + formatters["json"] = { + "()": "pythonjsonlogger.jsonlogger.JsonFormatter", + "format": "%(asctime)s %(name)s %(levelname)s %(message)s %(filename)s %(lineno)d" + } + except ImportError: + print("Warning: pythonjsonlogger not available, falling back to standard formatting") + enable_json = False + + # Define handlers + handlers = { + "console": { + "class": "logging.StreamHandler", + "level": log_level, + "formatter": "json" if enable_json else "standard", + "stream": "ext://sys.stdout" + } + } + + # Add file handler if enabled + if enable_file: + handlers["file"] = { + "class": "logging.FileHandler", + "level": log_level, + "formatter": "detailed", + "filename": f"{log_dir}/{service_name}.log", + "mode": "a", + "encoding": "utf-8" + } + + # Add logstash handler if in production + logstash_host = os.getenv("LOGSTASH_HOST") + if logstash_host and os.getenv("ENVIRONMENT") == "production": + try: + handlers["logstash"] = { "class": "logstash.TCPLogstashHandler", - "host": os.getenv("LOGSTASH_HOST", "localhost"), + "host": logstash_host, "port": int(os.getenv("LOGSTASH_PORT", "5000")), "version": 1, "message_type": "logstash", "fqdn": False, "tags": [service_name] } - }, + except Exception as e: + print(f"Warning: Could not setup logstash handler: {e}") + + # Define root logger configuration + root_handlers = ["console"] + if enable_file: + root_handlers.append("file") + if "logstash" in handlers: + root_handlers.append("logstash") + + # Complete logging configuration + config: Dict[str, Any] = { + "version": 1, + "disable_existing_loggers": False, + "formatters": formatters, + "handlers": handlers, "loggers": { - "": { - "handlers": ["console", "file"], + "": { # Root logger + "handlers": root_handlers, "level": log_level, "propagate": False }, @@ -64,14 +123,32 @@ def setup_logging(service_name: str, log_level: str = "INFO") -> None: "handlers": ["console"], "level": log_level, "propagate": False + }, + "sqlalchemy": { + "handlers": ["console"], + "level": "WARNING", # Reduce SQL logging noise + "propagate": False + }, + "httpx": { + "handlers": ["console"], + "level": "WARNING", # Reduce HTTP client logging + "propagate": False } } } - # Add logstash handler if in production - if os.getenv("ENVIRONMENT") == "production": - config["loggers"][""]["handlers"].append("logstash") - - logging.config.dictConfig(config) - logger = logging.getLogger(__name__) - logger.info(f"Logging configured for {service_name}") \ No newline at end of file + try: + logging.config.dictConfig(config) + logger = logging.getLogger(__name__) + logger.info(f"Logging configured for {service_name} at level {log_level}") + except Exception as e: + # Fallback to basic logging if configuration fails + logging.basicConfig( + level=getattr(logging, log_level.upper()), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] + ) + logger = logging.getLogger(__name__) + logger.error(f"Failed to configure advanced logging for {service_name}: {e}") + logger.info(f"Using basic logging configuration for {service_name}") + diff --git a/shared/monitoring/metrics.py b/shared/monitoring/metrics.py index 9da19f70..ab861909 100644 --- a/shared/monitoring/metrics.py +++ b/shared/monitoring/metrics.py @@ -1,147 +1,150 @@ -# shared/monitoring/metrics.py +# ================================================================ +# shared/monitoring/metrics.py - FIXED VERSION +# ================================================================ """ -Metrics collection for microservices +Centralized metrics collection for microservices - Fixed middleware issue """ import time import logging -from typing import Dict, Any, List # Added List import -from prometheus_client import Counter, Histogram, Gauge, start_http_server -from functools import wraps -from prometheus_client import generate_latest # Moved this import here for consistency -from fastapi import Request +from typing import Dict, Any, List, Optional +from prometheus_client import Counter, Histogram, Gauge, start_http_server, generate_latest +from fastapi import Request, Response +from threading import Lock logger = logging.getLogger(__name__) -# Prometheus metrics -REQUEST_COUNT = Counter( +# Global registry for metrics collectors +_metrics_registry: Dict[str, 'MetricsCollector'] = {} +_registry_lock = Lock() + +# Default Prometheus metrics +DEFAULT_REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code', 'service'] ) -REQUEST_DURATION = Histogram( +DEFAULT_REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint', 'service'] ) -ACTIVE_CONNECTIONS = Gauge( +DEFAULT_ACTIVE_CONNECTIONS = Gauge( 'active_connections', 'Active database connections', ['service'] ) -TRAINING_JOBS = Counter( - 'training_jobs_total', - 'Total training jobs', - ['status', 'service'] -) - -FORECASTS_GENERATED = Counter( - 'forecasts_generated_total', - 'Total forecasts generated', - ['service'] -) - class MetricsCollector: - """Metrics collector for microservices""" + """Thread-safe metrics collector for microservices""" def __init__(self, service_name: str): self.service_name = service_name self.start_time = time.time() - # Initialize dictionaries to hold custom counters and histograms self._counters: Dict[str, Counter] = {} self._histograms: Dict[str, Histogram] = {} + self._gauges: Dict[str, Gauge] = {} + self._lock = Lock() + + # Register in global registry + with _registry_lock: + _metrics_registry[service_name] = self def start_metrics_server(self, port: int = 8080): """Start Prometheus metrics server""" try: start_http_server(port) - logger.info(f"Metrics server started on port {port}") + logger.info(f"Metrics server started on port {port} for {self.service_name}") except Exception as e: - logger.error(f"Failed to start metrics server: {e}") + logger.error(f"Failed to start metrics server for {self.service_name}: {e}") - def record_request(self, method: str, endpoint: str, status_code: int, duration: float): - """Record HTTP request metrics""" - REQUEST_COUNT.labels( - method=method, - endpoint=endpoint, - status_code=status_code, - service=self.service_name - ).inc() - - REQUEST_DURATION.labels( - method=method, - endpoint=endpoint, - service=self.service_name - ).observe(duration) - - def record_training_job(self, status: str): - """Record training job metrics""" - TRAINING_JOBS.labels( - status=status, - service=self.service_name - ).inc() - - def record_forecast_generated(self): - """Record forecast generation metrics""" - FORECASTS_GENERATED.labels( - service=self.service_name - ).inc() - - def set_active_connections(self, count: int): - """Set active database connections""" - ACTIVE_CONNECTIONS.labels( - service=self.service_name - ).set(count) - - def register_counter(self, name: str, documentation: str, labels: List[str] = None): + def register_counter(self, name: str, documentation: str, labels: List[str] = None) -> Counter: """Register a custom Counter metric.""" - if name not in self._counters: + with self._lock: + if name in self._counters: + logger.warning(f"Counter '{name}' already registered for {self.service_name}") + return self._counters[name] + if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') - # Pass labelnames as a keyword argument - self._counters[name] = Counter(name, documentation, labelnames=labels) - logger.info(f"Registered counter: {name}") - else: - logger.warning(f"Counter '{name}' already registered.") - return self._counters[name] # Return the counter for direct use if needed + + try: + counter = Counter(f"{self.service_name.replace('-', '_')}_{name}", documentation, labelnames=labels) + self._counters[name] = counter + logger.info(f"Registered counter: {name} for {self.service_name}") + return counter + except Exception as e: + logger.error(f"Failed to register counter {name} for {self.service_name}: {e}") + raise + + def register_histogram(self, name: str, documentation: str, labels: List[str] = None, + buckets: tuple = Histogram.DEFAULT_BUCKETS) -> Histogram: + """Register a custom Histogram metric.""" + with self._lock: + if name in self._histograms: + logger.warning(f"Histogram '{name}' already registered for {self.service_name}") + return self._histograms[name] + + if labels is None: + labels = ['service'] + elif 'service' not in labels: + labels.append('service') + + try: + histogram = Histogram(f"{self.service_name.replace('-', '_')}_{name}", documentation, + labelnames=labels, buckets=buckets) + self._histograms[name] = histogram + logger.info(f"Registered histogram: {name} for {self.service_name}") + return histogram + except Exception as e: + logger.error(f"Failed to register histogram {name} for {self.service_name}: {e}") + raise + + def register_gauge(self, name: str, documentation: str, labels: List[str] = None) -> Gauge: + """Register a custom Gauge metric.""" + with self._lock: + if name in self._gauges: + logger.warning(f"Gauge '{name}' already registered for {self.service_name}") + return self._gauges[name] + + if labels is None: + labels = ['service'] + elif 'service' not in labels: + labels.append('service') + + try: + gauge = Gauge(f"{self.service_name.replace('-', '_')}_{name}", documentation, labelnames=labels) + self._gauges[name] = gauge + logger.info(f"Registered gauge: {name} for {self.service_name}") + return gauge + except Exception as e: + logger.error(f"Failed to register gauge {name} for {self.service_name}: {e}") + raise def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): - """Increment a custom Counter metric.""" + """Increment a counter metric.""" if name not in self._counters: - logger.error(f"Counter '{name}' not registered. Cannot increment.") + logger.error(f"Counter '{name}' not registered for {self.service_name}. Cannot increment.") return - # Ensure the 'service' label is always present if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name - self._counters[name].labels(**labels).inc(value) - - def register_histogram(self, name: str, documentation: str, labels: List[str] = None, buckets: tuple = Histogram.DEFAULT_BUCKETS): - """Register a custom Histogram metric.""" - if name not in self._histograms: - if labels is None: - labels = ['service'] - elif 'service' not in labels: - labels.append('service') - # Pass labelnames and buckets as keyword arguments - self._histograms[name] = Histogram(name, documentation, labelnames=labels, buckets=buckets) - logger.info(f"Registered histogram: {name}") - else: - logger.warning(f"Histogram '{name}' already registered.") - return self._histograms[name] # Return the histogram for direct use if needed + try: + self._counters[name].labels(**labels).inc(value) + except Exception as e: + logger.error(f"Failed to increment counter {name} for {self.service_name}: {e}") def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): - """Observe a custom Histogram metric.""" + """Observe a histogram metric.""" if name not in self._histograms: - logger.error(f"Histogram '{name}' not registered. Cannot observe.") + logger.error(f"Histogram '{name}' not registered for {self.service_name}. Cannot observe.") return if labels is None: @@ -149,145 +152,146 @@ class MetricsCollector: elif 'service' not in labels: labels['service'] = self.service_name - self._histograms[name].labels(**labels).observe(value) + try: + self._histograms[name].labels(**labels).observe(value) + except Exception as e: + logger.error(f"Failed to observe histogram {name} for {self.service_name}: {e}") + + def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): + """Set a gauge metric.""" + if name not in self._gauges: + logger.error(f"Gauge '{name}' not registered for {self.service_name}. Cannot set.") + return + + if labels is None: + labels = {'service': self.service_name} + elif 'service' not in labels: + labels['service'] = self.service_name + + try: + self._gauges[name].labels(**labels).set(value) + except Exception as e: + logger.error(f"Failed to set gauge {name} for {self.service_name}: {e}") + + def record_request(self, method: str, endpoint: str, status_code: int, duration: float): + """Record HTTP request metrics using default metrics.""" + try: + DEFAULT_REQUEST_COUNT.labels( + method=method, + endpoint=endpoint, + status_code=status_code, + service=self.service_name + ).inc() + + DEFAULT_REQUEST_DURATION.labels( + method=method, + endpoint=endpoint, + service=self.service_name + ).observe(duration) + except Exception as e: + logger.error(f"Failed to record request metrics for {self.service_name}: {e}") + + def set_active_connections(self, count: int): + """Set active database connections using default gauge.""" + try: + DEFAULT_ACTIVE_CONNECTIONS.labels(service=self.service_name).set(count) + except Exception as e: + logger.error(f"Failed to set active connections for {self.service_name}: {e}") def get_metrics(self) -> str: """Return Prometheus metrics in exposition format.""" - return generate_latest().decode('utf-8') + try: + return generate_latest().decode('utf-8') + except Exception as e: + logger.error(f"Failed to generate metrics for {self.service_name}: {e}") + return "" -def metrics_middleware(metrics_collector: MetricsCollector): - """Middleware to collect metrics""" - - async def middleware(request, call_next): - start_time = time.time() - - response = await call_next(request) - - duration = time.time() - start_time - - # Use the specific record_request for HTTP requests - metrics_collector.record_request( - method=request.method, - endpoint=request.url.path, - status_code=response.status_code, - duration=duration - ) - - return response - - return middleware +def get_metrics_collector(service_name: str) -> Optional[MetricsCollector]: + """Get metrics collector by service name from global registry.""" + with _registry_lock: + return _metrics_registry.get(service_name) -def setup_metrics(app): +def create_metrics_collector(service_name: str) -> MetricsCollector: """ - Setup metrics collection for FastAPI app - - Args: - app: FastAPI application instance - - Returns: - MetricsCollector: Configured metrics collector + Create metrics collector without adding middleware. + This should be called BEFORE app startup, not during lifespan. """ + # Get existing or create new + existing = get_metrics_collector(service_name) + if existing: + return existing - # Get service name from app title or default - service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') - - # Create metrics collector for this service - metrics_collector = MetricsCollector(service_name) - - # Add metrics middleware to collect HTTP request metrics + return MetricsCollector(service_name) + + +def add_metrics_middleware(app, metrics_collector: MetricsCollector): + """ + Add metrics middleware to app. Must be called BEFORE app startup. + """ @app.middleware("http") - async def collect_metrics_middleware(request: Request, call_next): + async def metrics_middleware(request: Request, call_next): start_time = time.time() - # Process the request - response = await call_next(request) - - # Calculate duration - duration = time.time() - start_time - - # Record metrics - metrics_collector.record_request( - method=request.method, - endpoint=request.url.path, - status_code=response.status_code, - duration=duration - ) - - return response - - # Add metrics endpoint if it doesn't exist - @app.get("/metrics") - async def prometheus_metrics(): - """Prometheus metrics endpoint""" - from prometheus_client import generate_latest - return Response( - content=generate_latest(), - media_type="text/plain; version=0.0.4; charset=utf-8" - ) - - # Store metrics collector in app state for later access - app.state.metrics_collector = metrics_collector - - logger.info(f"Metrics collection setup completed for service: {service_name}") + try: + response = await call_next(request) + duration = time.time() - start_time + + # Record request metrics + metrics_collector.record_request( + method=request.method, + endpoint=request.url.path, + status_code=response.status_code, + duration=duration + ) + + return response + except Exception as e: + duration = time.time() - start_time + # Record failed request + metrics_collector.record_request( + method=request.method, + endpoint=request.url.path, + status_code=500, + duration=duration + ) + raise return metrics_collector -# Alternative simplified setup function for services that don't need complex metrics -def setup_basic_metrics(app, service_name: str = None): +def add_metrics_endpoint(app, metrics_collector: MetricsCollector): + """Add metrics endpoint to app""" + @app.get("/metrics") + async def prometheus_metrics(): + """Prometheus metrics endpoint""" + return Response( + content=metrics_collector.get_metrics(), + media_type="text/plain; version=0.0.4; charset=utf-8" + ) + + +def setup_metrics_early(app, service_name: str = None) -> MetricsCollector: """ - Setup basic metrics collection without complex dependencies - - Args: - app: FastAPI application instance - service_name: Optional service name override - - Returns: - Simple metrics dict + Setup metrics collection BEFORE app startup. + This must be called before adding any middleware or starting the app. """ if service_name is None: - service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') + service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-').replace('.', '_') - # Simple in-memory metrics - metrics_data = { - "requests_total": 0, - "requests_by_method": {}, - "requests_by_status": {}, - "service_name": service_name, - "start_time": time.time() - } + # Create metrics collector + metrics_collector = create_metrics_collector(service_name) - @app.middleware("http") - async def simple_metrics_middleware(request: Request, call_next): - # Increment total requests - metrics_data["requests_total"] += 1 - - # Track by method - method = request.method - metrics_data["requests_by_method"][method] = metrics_data["requests_by_method"].get(method, 0) + 1 - - # Process request - response = await call_next(request) - - # Track by status code - status = str(response.status_code) - metrics_data["requests_by_status"][status] = metrics_data["requests_by_status"].get(status, 0) + 1 - - return response + # Add middleware (must be before app starts) + add_metrics_middleware(app, metrics_collector) - @app.get("/metrics") - async def simple_metrics(): - """Simple metrics endpoint""" - uptime = time.time() - metrics_data["start_time"] - return { - **metrics_data, - "uptime_seconds": round(uptime, 2) - } + # Add metrics endpoint + add_metrics_endpoint(app, metrics_collector) - app.state.simple_metrics = metrics_data + # Store in app state for access from routes + app.state.metrics_collector = metrics_collector - logger.info(f"Basic metrics setup completed for service: {service_name}") - - return metrics_data \ No newline at end of file + logger.info(f"Metrics setup completed for service: {service_name}") + return metrics_collector +