# shared/monitoring/metrics.py """ Metrics collection for microservices """ import time import logging from typing import Dict, Any, List # Added List import from prometheus_client import Counter, Histogram, Gauge, start_http_server from functools import wraps from prometheus_client import generate_latest # Moved this import here for consistency from fastapi import Request logger = logging.getLogger(__name__) # Prometheus metrics REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code', 'service'] ) REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint', 'service'] ) ACTIVE_CONNECTIONS = Gauge( 'active_connections', 'Active database connections', ['service'] ) TRAINING_JOBS = Counter( 'training_jobs_total', 'Total training jobs', ['status', 'service'] ) FORECASTS_GENERATED = Counter( 'forecasts_generated_total', 'Total forecasts generated', ['service'] ) class MetricsCollector: """Metrics collector for microservices""" def __init__(self, service_name: str): self.service_name = service_name self.start_time = time.time() # Initialize dictionaries to hold custom counters and histograms self._counters: Dict[str, Counter] = {} self._histograms: Dict[str, Histogram] = {} def start_metrics_server(self, port: int = 8080): """Start Prometheus metrics server""" try: start_http_server(port) logger.info(f"Metrics server started on port {port}") except Exception as e: logger.error(f"Failed to start metrics server: {e}") def record_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" REQUEST_COUNT.labels( method=method, endpoint=endpoint, status_code=status_code, service=self.service_name ).inc() REQUEST_DURATION.labels( method=method, endpoint=endpoint, service=self.service_name ).observe(duration) def record_training_job(self, status: str): """Record training job metrics""" TRAINING_JOBS.labels( status=status, service=self.service_name ).inc() def record_forecast_generated(self): """Record forecast generation metrics""" FORECASTS_GENERATED.labels( service=self.service_name ).inc() def set_active_connections(self, count: int): """Set active database connections""" ACTIVE_CONNECTIONS.labels( service=self.service_name ).set(count) def register_counter(self, name: str, documentation: str, labels: List[str] = None): """Register a custom Counter metric.""" if name not in self._counters: if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') # Pass labelnames as a keyword argument self._counters[name] = Counter(name, documentation, labelnames=labels) logger.info(f"Registered counter: {name}") else: logger.warning(f"Counter '{name}' already registered.") return self._counters[name] # Return the counter for direct use if needed def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): """Increment a custom Counter metric.""" if name not in self._counters: logger.error(f"Counter '{name}' not registered. Cannot increment.") return # Ensure the 'service' label is always present if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name self._counters[name].labels(**labels).inc(value) def register_histogram(self, name: str, documentation: str, labels: List[str] = None, buckets: tuple = Histogram.DEFAULT_BUCKETS): """Register a custom Histogram metric.""" if name not in self._histograms: if labels is None: labels = ['service'] elif 'service' not in labels: labels.append('service') # Pass labelnames and buckets as keyword arguments self._histograms[name] = Histogram(name, documentation, labelnames=labels, buckets=buckets) logger.info(f"Registered histogram: {name}") else: logger.warning(f"Histogram '{name}' already registered.") return self._histograms[name] # Return the histogram for direct use if needed def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Observe a custom Histogram metric.""" if name not in self._histograms: logger.error(f"Histogram '{name}' not registered. Cannot observe.") return if labels is None: labels = {'service': self.service_name} elif 'service' not in labels: labels['service'] = self.service_name self._histograms[name].labels(**labels).observe(value) def get_metrics(self) -> str: """Return Prometheus metrics in exposition format.""" return generate_latest().decode('utf-8') def metrics_middleware(metrics_collector: MetricsCollector): """Middleware to collect metrics""" async def middleware(request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time # Use the specific record_request for HTTP requests metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) return response return middleware def setup_metrics(app): """ Setup metrics collection for FastAPI app Args: app: FastAPI application instance Returns: MetricsCollector: Configured metrics collector """ # Get service name from app title or default service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') # Create metrics collector for this service metrics_collector = MetricsCollector(service_name) # Add metrics middleware to collect HTTP request metrics @app.middleware("http") async def collect_metrics_middleware(request: Request, call_next): start_time = time.time() # Process the request response = await call_next(request) # Calculate duration duration = time.time() - start_time # Record metrics metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) return response # Add metrics endpoint if it doesn't exist @app.get("/metrics") async def prometheus_metrics(): """Prometheus metrics endpoint""" from prometheus_client import generate_latest return Response( content=generate_latest(), media_type="text/plain; version=0.0.4; charset=utf-8" ) # Store metrics collector in app state for later access app.state.metrics_collector = metrics_collector logger.info(f"Metrics collection setup completed for service: {service_name}") return metrics_collector # Alternative simplified setup function for services that don't need complex metrics def setup_basic_metrics(app, service_name: str = None): """ Setup basic metrics collection without complex dependencies Args: app: FastAPI application instance service_name: Optional service name override Returns: Simple metrics dict """ if service_name is None: service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') # Simple in-memory metrics metrics_data = { "requests_total": 0, "requests_by_method": {}, "requests_by_status": {}, "service_name": service_name, "start_time": time.time() } @app.middleware("http") async def simple_metrics_middleware(request: Request, call_next): # Increment total requests metrics_data["requests_total"] += 1 # Track by method method = request.method metrics_data["requests_by_method"][method] = metrics_data["requests_by_method"].get(method, 0) + 1 # Process request response = await call_next(request) # Track by status code status = str(response.status_code) metrics_data["requests_by_status"][status] = metrics_data["requests_by_status"].get(status, 0) + 1 return response @app.get("/metrics") async def simple_metrics(): """Simple metrics endpoint""" uptime = time.time() - metrics_data["start_time"] return { **metrics_data, "uptime_seconds": round(uptime, 2) } app.state.simple_metrics = metrics_data logger.info(f"Basic metrics setup completed for service: {service_name}") return metrics_data