# shared/monitoring/metrics.py """ Metrics collection for microservices """ import time import logging from typing import Dict, Any, List # Added List import from prometheus_client import Counter, Histogram, Gauge, start_http_server from functools import wraps from prometheus_client import generate_latest # Moved this import here for consistency logger = logging.getLogger(__name__) # Prometheus metrics REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code', 'service'] ) REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint', 'service'] ) ACTIVE_CONNECTIONS = Gauge( 'active_connections', 'Active database connections', ['service'] ) TRAINING_JOBS = Counter( 'training_jobs_total', 'Total training jobs', ['status', 'service'] ) FORECASTS_GENERATED = Counter( 'forecasts_generated_total', 'Total forecasts generated', ['service'] ) class MetricsCollector: """Metrics collector for microservices""" def __init__(self, service_name: str): self.service_name = service_name self.start_time = time.time() # Initialize dictionaries to hold custom counters and histograms self._counters: Dict[str, Counter] = {} self._histograms: Dict[str, Histogram] = {} def start_metrics_server(self, port: int = 8080): """Start Prometheus metrics server""" try: start_http_server(port) logger.info(f"Metrics server started on port {port}") except Exception as e: logger.error(f"Failed to start metrics server: {e}") def record_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" REQUEST_COUNT.labels( method=method, endpoint=endpoint, status_code=status_code, service=self.service_name ).inc() REQUEST_DURATION.labels( method=method, endpoint=endpoint, service=self.service_name ).observe(duration) def record_training_job(self, status: str): """Record training job metrics""" TRAINING_JOBS.labels( status=status, service=self.service_name ).inc() def record_forecast_generated(self): """Record forecast generation metrics""" FORECASTS_GENERATED.labels( service=self.service_name ).inc() def set_active_connections(self, count: int): """Set active database connections""" ACTIVE_CONNECTIONS.labels( service=self.service_name ).set(count) def register_counter(self, name: str, documentation: str, labels: List[str] = None): """Register a custom Counter metric.""" if name not in self._counters: if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') # Pass labelnames as a keyword argument self._counters[name] = Counter(name, documentation, labelnames=labels) logger.info(f"Registered counter: {name}") else: logger.warning(f"Counter '{name}' already registered.") return self._counters[name] # Return the counter for direct use if needed def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): """Increment a custom Counter metric.""" if name not in self._counters: logger.error(f"Counter '{name}' not registered. Cannot increment.") return # Ensure the 'service' label is always present if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name self._counters[name].labels(**labels).inc(value) def register_histogram(self, name: str, documentation: str, labels: List[str] = None, buckets: tuple = Histogram.DEFAULT_BUCKETS): """Register a custom Histogram metric.""" if name not in self._histograms: if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') # Pass labelnames and buckets as keyword arguments self._histograms[name] = Histogram(name, documentation, labelnames=labels, buckets=buckets) logger.info(f"Registered histogram: {name}") else: logger.warning(f"Histogram '{name}' already registered.") return self._histograms[name] # Return the histogram for direct use if needed def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Observe a custom Histogram metric.""" if name not in self._histograms: logger.error(f"Histogram '{name}' not registered. Cannot observe.") return if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name self._histograms[name].labels(**labels).observe(value) def get_metrics(self) -> str: """Return Prometheus metrics in exposition format.""" return generate_latest().decode('utf-8') def metrics_middleware(metrics_collector: MetricsCollector): """Middleware to collect metrics""" async def middleware(request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time # Use the specific record_request for HTTP requests metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) return response return middleware