Files
bakery-ia/shared/monitoring/metrics.py

360 lines
13 KiB
Python
Raw Normal View History

"""
OpenTelemetry Metrics Collection for Microservices
Replaces Prometheus with native OpenTelemetry metrics export to SigNoz
"""
import time
import logging
import structlog
from typing import Dict, Any, Optional
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
2025-07-18 12:34:28 +02:00
from fastapi import Request, Response
from threading import Lock
import os
logger = structlog.get_logger()
2025-07-18 12:34:28 +02:00
# Global registry for metrics collectors
_metrics_registry: Dict[str, 'MetricsCollector'] = {}
_registry_lock = Lock()
class MetricsCollector:
"""
OpenTelemetry-based metrics collector for microservices.
Exports metrics directly to SigNoz via OTLP (no Prometheus).
"""
2025-07-17 21:48:41 +02:00
def __init__(
self,
service_name: str,
service_version: str = "1.0.0",
meter_provider: Optional[MeterProvider] = None
):
self.service_name = service_name
self.service_version = service_version
self.start_time = time.time()
# Use provided meter provider or get global
if meter_provider:
self.meter = meter_provider.get_meter(__name__)
else:
self.meter = metrics.get_meter(__name__)
# Store created instruments
self._counters: Dict[str, Any] = {}
self._histograms: Dict[str, Any] = {}
self._up_down_counters: Dict[str, Any] = {}
2025-07-18 12:34:28 +02:00
self._lock = Lock()
2025-07-18 12:34:28 +02:00
# Register in global registry
with _registry_lock:
_metrics_registry[service_name] = self
2025-07-17 21:48:41 +02:00
# Create default HTTP metrics
self._setup_default_metrics()
logger.info(
"OpenTelemetry metrics collector initialized",
service=service_name
)
def _setup_default_metrics(self):
"""Setup default HTTP metrics"""
self._counters["http_requests_total"] = self.meter.create_counter(
name=f"{self.service_name.replace('-', '_')}_http_requests_total",
description="Total HTTP requests",
unit="requests"
)
self._histograms["http_request_duration"] = self.meter.create_histogram(
name=f"{self.service_name.replace('-', '_')}_http_request_duration_seconds",
description="HTTP request duration in seconds",
unit="s"
)
self._up_down_counters["active_requests"] = self.meter.create_up_down_counter(
name=f"{self.service_name.replace('-', '_')}_active_requests",
description="Number of active HTTP requests",
unit="requests"
)
def register_counter(self, name: str, documentation: str, labels: list = None) -> Any:
"""Register a custom Counter metric"""
2025-07-18 12:34:28 +02:00
with self._lock:
if name in self._counters:
logger.warning(f"Counter '{name}' already registered for {self.service_name}")
return self._counters[name]
2025-07-18 12:34:28 +02:00
try:
counter = self.meter.create_counter(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=documentation,
unit="1"
)
2025-07-18 12:34:28 +02:00
self._counters[name] = counter
logger.info(f"Registered counter: {name} for {self.service_name}")
return counter
except Exception as e:
logger.error(f"Failed to register counter {name} for {self.service_name}: {e}")
raise
def register_histogram(
self,
name: str,
documentation: str,
labels: list = None,
buckets: tuple = None
) -> Any:
"""Register a custom Histogram metric"""
2025-07-18 12:34:28 +02:00
with self._lock:
if name in self._histograms:
logger.warning(f"Histogram '{name}' already registered for {self.service_name}")
return self._histograms[name]
2025-07-18 12:34:28 +02:00
try:
histogram = self.meter.create_histogram(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=documentation,
unit="1"
)
2025-07-18 12:34:28 +02:00
self._histograms[name] = histogram
logger.info(f"Registered histogram: {name} for {self.service_name}")
return histogram
except Exception as e:
logger.error(f"Failed to register histogram {name} for {self.service_name}: {e}")
raise
def register_gauge(self, name: str, documentation: str, labels: list = None) -> Any:
"""Register a custom Gauge metric (using UpDownCounter)"""
2025-07-18 12:34:28 +02:00
with self._lock:
if name in self._up_down_counters:
2025-07-18 12:34:28 +02:00
logger.warning(f"Gauge '{name}' already registered for {self.service_name}")
return self._up_down_counters[name]
2025-07-18 12:34:28 +02:00
try:
gauge = self.meter.create_up_down_counter(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=documentation,
unit="1"
)
self._up_down_counters[name] = gauge
2025-07-18 12:34:28 +02:00
logger.info(f"Registered gauge: {name} for {self.service_name}")
return gauge
except Exception as e:
logger.error(f"Failed to register gauge {name} for {self.service_name}: {e}")
raise
2025-07-17 21:48:41 +02:00
def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None):
"""Increment a counter metric"""
2025-07-17 21:48:41 +02:00
if name not in self._counters:
logger.error(f"Counter '{name}' not registered for {self.service_name}")
2025-07-17 21:48:41 +02:00
return
if labels is None:
labels = {"service": self.service_name}
elif "service" not in labels:
labels["service"] = self.service_name
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
try:
self._counters[name].add(value, labels)
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to increment counter {name} for {self.service_name}: {e}")
2025-07-17 21:48:41 +02:00
def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None):
"""Observe a histogram metric"""
2025-07-17 21:48:41 +02:00
if name not in self._histograms:
logger.error(f"Histogram '{name}' not registered for {self.service_name}")
2025-07-17 21:48:41 +02:00
return
if labels is None:
labels = {"service": self.service_name}
elif "service" not in labels:
labels["service"] = self.service_name
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
try:
self._histograms[name].record(value, labels)
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to observe histogram {name} for {self.service_name}: {e}")
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None):
"""Set a gauge metric (using add for UpDownCounter)"""
if name not in self._up_down_counters:
logger.error(f"Gauge '{name}' not registered for {self.service_name}")
2025-07-18 12:34:28 +02:00
return
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
if labels is None:
labels = {"service": self.service_name}
elif "service" not in labels:
labels["service"] = self.service_name
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
try:
# For UpDownCounter, we need to track the delta
# Store current value and calculate delta
key = f"{name}_{str(sorted(labels.items()))}"
if not hasattr(self, '_gauge_values'):
self._gauge_values = {}
old_value = self._gauge_values.get(key, 0)
delta = value - old_value
self._gauge_values[key] = value
self._up_down_counters[name].add(delta, labels)
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to set gauge {name} for {self.service_name}: {e}")
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
def record_request(self, method: str, endpoint: str, status_code: int, duration: float):
"""Record HTTP request metrics"""
2025-07-18 12:34:28 +02:00
try:
attributes = {
"service": self.service_name,
"http.method": method,
"http.route": endpoint,
"http.status_code": str(status_code)
}
self._counters["http_requests_total"].add(1, attributes)
self._histograms["http_request_duration"].record(duration, attributes)
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to record request metrics for {self.service_name}: {e}")
2025-07-17 21:48:41 +02:00
def increment_active_requests(self):
"""Increment active request counter"""
2025-07-18 12:34:28 +02:00
try:
self._up_down_counters["active_requests"].add(1, {"service": self.service_name})
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to increment active requests: {e}")
2025-07-17 21:48:41 +02:00
def decrement_active_requests(self):
"""Decrement active request counter"""
2025-07-18 12:34:28 +02:00
try:
self._up_down_counters["active_requests"].add(-1, {"service": self.service_name})
2025-07-18 12:34:28 +02:00
except Exception as e:
logger.error(f"Failed to decrement active requests: {e}")
def set_active_connections(self, count: int):
"""Set active database connections"""
self.set_gauge("active_connections", count)
2025-07-17 21:48:41 +02:00
2025-07-18 12:34:28 +02:00
def get_metrics_collector(service_name: str) -> Optional[MetricsCollector]:
"""Get metrics collector by service name from global registry"""
2025-07-18 12:34:28 +02:00
with _registry_lock:
return _metrics_registry.get(service_name)
2025-07-18 11:51:43 +02:00
def create_metrics_collector(
service_name: str,
service_version: str = "1.0.0",
meter_provider: Optional[MeterProvider] = None
) -> MetricsCollector:
2025-07-18 11:51:43 +02:00
"""
Create metrics collector.
2025-07-18 12:34:28 +02:00
This should be called BEFORE app startup, not during lifespan.
2025-07-18 11:51:43 +02:00
"""
2025-07-18 12:34:28 +02:00
# Get existing or create new
existing = get_metrics_collector(service_name)
if existing:
return existing
return MetricsCollector(service_name, service_version, meter_provider)
2025-07-18 12:34:28 +02:00
def add_metrics_middleware(app, metrics_collector: MetricsCollector):
"""
Add metrics middleware to app. Must be called BEFORE app startup.
"""
2025-07-18 11:51:43 +02:00
@app.middleware("http")
2025-07-18 12:34:28 +02:00
async def metrics_middleware(request: Request, call_next):
# Increment active requests
metrics_collector.increment_active_requests()
2025-07-18 11:51:43 +02:00
start_time = time.time()
2025-07-18 12:34:28 +02:00
try:
response = await call_next(request)
duration = time.time() - start_time
2025-07-18 12:34:28 +02:00
# Record request metrics
metrics_collector.record_request(
method=request.method,
endpoint=request.url.path,
status_code=response.status_code,
duration=duration
)
# Decrement active requests
metrics_collector.decrement_active_requests()
2025-07-18 12:34:28 +02:00
return response
except Exception as e:
duration = time.time() - start_time
2025-07-18 12:34:28 +02:00
# Record failed request
metrics_collector.record_request(
method=request.method,
endpoint=request.url.path,
status_code=500,
duration=duration
)
# Decrement active requests
metrics_collector.decrement_active_requests()
raise
2025-07-18 12:34:28 +02:00
return metrics_collector
2025-07-18 11:51:43 +02:00
def setup_metrics_early(
app,
service_name: str = None,
service_version: str = "1.0.0",
meter_provider: Optional[MeterProvider] = None
) -> MetricsCollector:
2025-07-18 11:51:43 +02:00
"""
2025-07-18 12:34:28 +02:00
Setup metrics collection BEFORE app startup.
This must be called before adding any middleware or starting the app.
Note: No Prometheus endpoint is created - all metrics go to SigNoz via OTLP
2025-07-18 11:51:43 +02:00
"""
if service_name is None:
2025-07-18 12:34:28 +02:00
service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-').replace('.', '_')
2025-07-18 12:34:28 +02:00
# Create metrics collector
metrics_collector = create_metrics_collector(service_name, service_version, meter_provider)
2025-07-18 12:34:28 +02:00
# Add middleware (must be before app starts)
add_metrics_middleware(app, metrics_collector)
2025-07-18 12:34:28 +02:00
# Store in app state for access from routes
app.state.metrics_collector = metrics_collector
logger.info(f"OpenTelemetry metrics setup completed for service: {service_name}")
2025-07-18 12:34:28 +02:00
return metrics_collector
2025-08-08 09:08:41 +02:00
# Helper function for endpoint tracking (kept for backward compatibility)
2025-08-08 09:08:41 +02:00
def track_endpoint_metrics(endpoint_name: str = None, service_name: str = None):
"""Decorator for tracking endpoint metrics - metrics handled by middleware"""
2025-08-08 09:08:41 +02:00
def decorator(func):
2025-09-23 12:49:35 +02:00
import asyncio
from functools import wraps
@wraps(func)
async def async_wrapper(*args, **kwargs):
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args, **kwargs):
2025-08-08 09:08:41 +02:00
return func(*args, **kwargs)
2025-09-23 12:49:35 +02:00
# Return appropriate wrapper based on function type
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
2025-08-08 09:08:41 +02:00
return decorator