# ================================================================ # shared/monitoring/metrics.py - FIXED VERSION # ================================================================ """ Centralized metrics collection for microservices - Fixed middleware issue """ import time import logging from typing import Dict, Any, List, Optional from prometheus_client import Counter, Histogram, Gauge, start_http_server, generate_latest from fastapi import Request, Response from threading import Lock logger = logging.getLogger(__name__) # Global registry for metrics collectors _metrics_registry: Dict[str, 'MetricsCollector'] = {} _registry_lock = Lock() # Default Prometheus metrics DEFAULT_REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code', 'service'] ) DEFAULT_REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint', 'service'] ) DEFAULT_ACTIVE_CONNECTIONS = Gauge( 'active_connections', 'Active database connections', ['service'] ) class MetricsCollector: """Thread-safe metrics collector for microservices""" def __init__(self, service_name: str): self.service_name = service_name self.start_time = time.time() self._counters: Dict[str, Counter] = {} self._histograms: Dict[str, Histogram] = {} self._gauges: Dict[str, Gauge] = {} self._lock = Lock() # Register in global registry with _registry_lock: _metrics_registry[service_name] = self def start_metrics_server(self, port: int = 8080): """Start Prometheus metrics server""" try: start_http_server(port) logger.info(f"Metrics server started on port {port} for {self.service_name}") except Exception as e: logger.error(f"Failed to start metrics server for {self.service_name}: {e}") def register_counter(self, name: str, documentation: str, labels: List[str] = None) -> Counter: """Register a custom Counter metric.""" with self._lock: if name in self._counters: logger.warning(f"Counter '{name}' already registered for {self.service_name}") return self._counters[name] if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') try: counter = Counter(f"{self.service_name.replace('-', '_')}_{name}", documentation, labelnames=labels) self._counters[name] = counter logger.info(f"Registered counter: {name} for {self.service_name}") return counter except Exception as e: logger.error(f"Failed to register counter {name} for {self.service_name}: {e}") raise def register_histogram(self, name: str, documentation: str, labels: List[str] = None, buckets: tuple = Histogram.DEFAULT_BUCKETS) -> Histogram: """Register a custom Histogram metric.""" with self._lock: if name in self._histograms: logger.warning(f"Histogram '{name}' already registered for {self.service_name}") return self._histograms[name] if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') try: histogram = Histogram(f"{self.service_name.replace('-', '_')}_{name}", documentation, labelnames=labels, buckets=buckets) self._histograms[name] = histogram logger.info(f"Registered histogram: {name} for {self.service_name}") return histogram except ValueError as e: if "Duplicated timeseries" in str(e): # Metric already exists in global registry, try to find it from prometheus_client import REGISTRY metric_name = f"{self.service_name.replace('-', '_')}_{name}" for collector in REGISTRY._collector_to_names.keys(): if hasattr(collector, '_name') and collector._name == metric_name: self._histograms[name] = collector logger.warning(f"Reusing existing histogram: {name} for {self.service_name}") return collector # If we can't find it, create a new name with suffix import time suffix = str(int(time.time() * 1000))[-6:] # Last 6 digits of timestamp histogram = Histogram(f"{self.service_name.replace('-', '_')}_{name}_{suffix}", documentation, labelnames=labels, buckets=buckets) self._histograms[name] = histogram logger.warning(f"Created histogram with suffix: {name}_{suffix} for {self.service_name}") return histogram else: logger.error(f"Failed to register histogram {name} for {self.service_name}: {e}") raise except Exception as e: logger.error(f"Failed to register histogram {name} for {self.service_name}: {e}") raise def register_gauge(self, name: str, documentation: str, labels: List[str] = None) -> Gauge: """Register a custom Gauge metric.""" with self._lock: if name in self._gauges: logger.warning(f"Gauge '{name}' already registered for {self.service_name}") return self._gauges[name] if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') try: gauge = Gauge(f"{self.service_name.replace('-', '_')}_{name}", documentation, labelnames=labels) self._gauges[name] = gauge logger.info(f"Registered gauge: {name} for {self.service_name}") return gauge except Exception as e: logger.error(f"Failed to register gauge {name} for {self.service_name}: {e}") raise def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): """Increment a counter metric.""" if name not in self._counters: logger.error(f"Counter '{name}' not registered for {self.service_name}. Cannot increment.") return if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name try: self._counters[name].labels(**labels).inc(value) except Exception as e: logger.error(f"Failed to increment counter {name} for {self.service_name}: {e}") def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Observe a histogram metric.""" if name not in self._histograms: logger.error(f"Histogram '{name}' not registered for {self.service_name}. Cannot observe.") return if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name try: self._histograms[name].labels(**labels).observe(value) except Exception as e: logger.error(f"Failed to observe histogram {name} for {self.service_name}: {e}") def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): """Set a gauge metric.""" if name not in self._gauges: logger.error(f"Gauge '{name}' not registered for {self.service_name}. Cannot set.") return if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name try: self._gauges[name].labels(**labels).set(value) except Exception as e: logger.error(f"Failed to set gauge {name} for {self.service_name}: {e}") def record_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics using default metrics.""" try: DEFAULT_REQUEST_COUNT.labels( method=method, endpoint=endpoint, status_code=status_code, service=self.service_name ).inc() DEFAULT_REQUEST_DURATION.labels( method=method, endpoint=endpoint, service=self.service_name ).observe(duration) except Exception as e: logger.error(f"Failed to record request metrics for {self.service_name}: {e}") def set_active_connections(self, count: int): """Set active database connections using default gauge.""" try: DEFAULT_ACTIVE_CONNECTIONS.labels(service=self.service_name).set(count) except Exception as e: logger.error(f"Failed to set active connections for {self.service_name}: {e}") def get_metrics(self) -> str: """Return Prometheus metrics in exposition format.""" try: return generate_latest().decode('utf-8') except Exception as e: logger.error(f"Failed to generate metrics for {self.service_name}: {e}") return "" def get_metrics_collector(service_name: str) -> Optional[MetricsCollector]: """Get metrics collector by service name from global registry.""" with _registry_lock: return _metrics_registry.get(service_name) def create_metrics_collector(service_name: str) -> MetricsCollector: """ Create metrics collector without adding middleware. This should be called BEFORE app startup, not during lifespan. """ # Get existing or create new existing = get_metrics_collector(service_name) if existing: return existing return MetricsCollector(service_name) def add_metrics_middleware(app, metrics_collector: MetricsCollector): """ Add metrics middleware to app. Must be called BEFORE app startup. """ @app.middleware("http") async def metrics_middleware(request: Request, call_next): start_time = time.time() try: response = await call_next(request) duration = time.time() - start_time # Record request metrics metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) return response except Exception as e: duration = time.time() - start_time # Record failed request metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=500, duration=duration ) raise return metrics_collector def add_metrics_endpoint(app, metrics_collector: MetricsCollector): """Add metrics endpoint to app""" @app.get("/metrics") async def prometheus_metrics(): """Prometheus metrics endpoint""" return Response( content=metrics_collector.get_metrics(), media_type="text/plain; version=0.0.4; charset=utf-8" ) def setup_metrics_early(app, service_name: str = None) -> MetricsCollector: """ Setup metrics collection BEFORE app startup. This must be called before adding any middleware or starting the app. """ if service_name is None: service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-').replace('.', '_') # Create metrics collector metrics_collector = create_metrics_collector(service_name) # Add middleware (must be before app starts) add_metrics_middleware(app, metrics_collector) # Add metrics endpoint add_metrics_endpoint(app, metrics_collector) # Store in app state for access from routes app.state.metrics_collector = metrics_collector logger.info(f"Metrics setup completed for service: {service_name}") return metrics_collector # Additional helper function for endpoint tracking def track_endpoint_metrics(endpoint_name: str = None, service_name: str = None): """Decorator for tracking endpoint metrics - Fixed for async functions""" def decorator(func): import asyncio from functools import wraps @wraps(func) async def async_wrapper(*args, **kwargs): # For now, just pass through - metrics are handled by middleware return await func(*args, **kwargs) @wraps(func) def sync_wrapper(*args, **kwargs): # For now, just pass through - metrics are handled by middleware return func(*args, **kwargs) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator