""" OpenTelemetry Metrics Collection for Microservices Replaces Prometheus with native OpenTelemetry metrics export to SigNoz """ import time import logging import structlog from typing import Dict, Any, Optional from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION from fastapi import Request, Response from threading import Lock import os logger = structlog.get_logger() # Global registry for metrics collectors _metrics_registry: Dict[str, 'MetricsCollector'] = {} _registry_lock = Lock() class MetricsCollector: """ OpenTelemetry-based metrics collector for microservices. Exports metrics directly to SigNoz via OTLP (no Prometheus). """ def __init__( self, service_name: str, service_version: str = "1.0.0", meter_provider: Optional[MeterProvider] = None ): self.service_name = service_name self.service_version = service_version self.start_time = time.time() # Use provided meter provider or get global if meter_provider: self.meter = meter_provider.get_meter(__name__) else: self.meter = metrics.get_meter(__name__) # Store created instruments self._counters: Dict[str, Any] = {} self._histograms: Dict[str, Any] = {} self._up_down_counters: Dict[str, Any] = {} self._lock = Lock() # Register in global registry with _registry_lock: _metrics_registry[service_name] = self # Create default HTTP metrics self._setup_default_metrics() logger.info( "OpenTelemetry metrics collector initialized", service=service_name ) def _setup_default_metrics(self): """Setup default HTTP metrics""" self._counters["http_requests_total"] = self.meter.create_counter( name=f"{self.service_name.replace('-', '_')}_http_requests_total", description="Total HTTP requests", unit="requests" ) self._histograms["http_request_duration"] = self.meter.create_histogram( name=f"{self.service_name.replace('-', '_')}_http_request_duration_seconds", description="HTTP request duration in seconds", unit="s" ) self._up_down_counters["active_requests"] = self.meter.create_up_down_counter( name=f"{self.service_name.replace('-', '_')}_active_requests", description="Number of active HTTP requests", unit="requests" ) def register_counter(self, name: str, documentation: str, labels: list = None) -> Any: """Register a custom Counter metric""" with self._lock: if name in self._counters: logger.warning(f"Counter '{name}' already registered for {self.service_name}") return self._counters[name] try: counter = self.meter.create_counter( name=f"{self.service_name.replace('-', '_')}_{name}", description=documentation, unit="1" ) self._counters[name] = counter logger.info(f"Registered counter: {name} for {self.service_name}") return counter except Exception as e: logger.error(f"Failed to register counter {name} for {self.service_name}: {e}") raise def register_histogram( self, name: str, documentation: str, labels: list = None, buckets: tuple = None ) -> Any: """Register a custom Histogram metric""" with self._lock: if name in self._histograms: logger.warning(f"Histogram '{name}' already registered for {self.service_name}") return self._histograms[name] try: histogram = self.meter.create_histogram( name=f"{self.service_name.replace('-', '_')}_{name}", description=documentation, unit="1" ) self._histograms[name] = histogram logger.info(f"Registered histogram: {name} for {self.service_name}") return histogram except Exception as e: logger.error(f"Failed to register histogram {name} for {self.service_name}: {e}") raise def register_gauge(self, name: str, documentation: str, labels: list = None) -> Any: """Register a custom Gauge metric (using UpDownCounter)""" with self._lock: if name in self._up_down_counters: logger.warning(f"Gauge '{name}' already registered for {self.service_name}") return self._up_down_counters[name] try: gauge = self.meter.create_up_down_counter( name=f"{self.service_name.replace('-', '_')}_{name}", description=documentation, unit="1" ) self._up_down_counters[name] = gauge logger.info(f"Registered gauge: {name} for {self.service_name}") return gauge except Exception as e: logger.error(f"Failed to register gauge {name} for {self.service_name}: {e}") raise def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): """Increment a counter metric""" if name not in self._counters: logger.error(f"Counter '{name}' not registered for {self.service_name}") return if labels is None: labels = {"service": self.service_name} elif "service" not in labels: labels["service"] = self.service_name try: self._counters[name].add(value, labels) except Exception as e: logger.error(f"Failed to increment counter {name} for {self.service_name}: {e}") def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Observe a histogram metric""" if name not in self._histograms: logger.error(f"Histogram '{name}' not registered for {self.service_name}") return if labels is None: labels = {"service": self.service_name} elif "service" not in labels: labels["service"] = self.service_name try: self._histograms[name].record(value, labels) except Exception as e: logger.error(f"Failed to observe histogram {name} for {self.service_name}: {e}") def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): """Set a gauge metric (using add for UpDownCounter)""" if name not in self._up_down_counters: logger.error(f"Gauge '{name}' not registered for {self.service_name}") return if labels is None: labels = {"service": self.service_name} elif "service" not in labels: labels["service"] = self.service_name try: # For UpDownCounter, we need to track the delta # Store current value and calculate delta key = f"{name}_{str(sorted(labels.items()))}" if not hasattr(self, '_gauge_values'): self._gauge_values = {} old_value = self._gauge_values.get(key, 0) delta = value - old_value self._gauge_values[key] = value self._up_down_counters[name].add(delta, labels) except Exception as e: logger.error(f"Failed to set gauge {name} for {self.service_name}: {e}") def record_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" try: attributes = { "service": self.service_name, "http.method": method, "http.route": endpoint, "http.status_code": str(status_code) } self._counters["http_requests_total"].add(1, attributes) self._histograms["http_request_duration"].record(duration, attributes) except Exception as e: logger.error(f"Failed to record request metrics for {self.service_name}: {e}") def increment_active_requests(self): """Increment active request counter""" try: self._up_down_counters["active_requests"].add(1, {"service": self.service_name}) except Exception as e: logger.error(f"Failed to increment active requests: {e}") def decrement_active_requests(self): """Decrement active request counter""" try: self._up_down_counters["active_requests"].add(-1, {"service": self.service_name}) except Exception as e: logger.error(f"Failed to decrement active requests: {e}") def set_active_connections(self, count: int): """Set active database connections""" self.set_gauge("active_connections", count) def get_metrics_collector(service_name: str) -> Optional[MetricsCollector]: """Get metrics collector by service name from global registry""" with _registry_lock: return _metrics_registry.get(service_name) def create_metrics_collector( service_name: str, service_version: str = "1.0.0", meter_provider: Optional[MeterProvider] = None ) -> MetricsCollector: """ Create metrics collector. This should be called BEFORE app startup, not during lifespan. """ # Get existing or create new existing = get_metrics_collector(service_name) if existing: return existing return MetricsCollector(service_name, service_version, meter_provider) def add_metrics_middleware(app, metrics_collector: MetricsCollector): """ Add metrics middleware to app. Must be called BEFORE app startup. """ @app.middleware("http") async def metrics_middleware(request: Request, call_next): # Increment active requests metrics_collector.increment_active_requests() start_time = time.time() try: response = await call_next(request) duration = time.time() - start_time # Record request metrics metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) # Decrement active requests metrics_collector.decrement_active_requests() return response except Exception as e: duration = time.time() - start_time # Record failed request metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=500, duration=duration ) # Decrement active requests metrics_collector.decrement_active_requests() raise return metrics_collector def track_user_activity(user_id: str, action: str, service_name: str = "unknown-service", metadata: dict = None): """Track user activity metrics using the appropriate metrics collector""" if metadata is None: metadata = {} # Add user-specific attributes attributes = { "user.id": user_id, "action": action, **metadata } # Get the metrics collector for the specified service metrics_collector = get_metrics_collector(service_name) if metrics_collector: # Use the collector's counter registration system counter_name = "user_activity_total" # Check if counter already exists, if not register it if counter_name not in metrics_collector._counters: metrics_collector.register_counter( name=counter_name, documentation="Total user activity events" ) # Increment the counter with attributes metrics_collector.increment_counter(counter_name, value=1, labels=attributes) else: # Fallback: create a temporary counter if no collector exists from opentelemetry import metrics meter = metrics.get_meter(__name__) user_activity_counter = meter.create_counter( name="user_activity_total", description="User activity events", unit="events" ) user_activity_counter.add(1, attributes) def setup_metrics_early( app, service_name: str = None, service_version: str = "1.0.0", meter_provider: Optional[MeterProvider] = None ) -> MetricsCollector: """ Setup metrics collection BEFORE app startup. This must be called before adding any middleware or starting the app. Note: No Prometheus endpoint is created - all metrics go to SigNoz via OTLP """ if service_name is None: service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-').replace('.', '_') # Create metrics collector metrics_collector = create_metrics_collector(service_name, service_version, meter_provider) # Add middleware (must be before app starts) add_metrics_middleware(app, metrics_collector) # Store in app state for access from routes app.state.metrics_collector = metrics_collector logger.info(f"OpenTelemetry metrics setup completed for service: {service_name}") return metrics_collector # Helper function for endpoint tracking (kept for backward compatibility) def track_endpoint_metrics(endpoint_name: str = None, service_name: str = None): """Decorator for tracking endpoint metrics - metrics handled by middleware""" def decorator(func): import asyncio from functools import wraps @wraps(func) async def async_wrapper(*args, **kwargs): return await func(*args, **kwargs) @wraps(func) def sync_wrapper(*args, **kwargs): return func(*args, **kwargs) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator