Files
bakery-ia/shared/monitoring/metrics_exporter.py

305 lines
10 KiB
Python
Raw Permalink Normal View History

"""
OpenTelemetry Metrics Integration for SigNoz
2026-01-09 23:14:12 +01:00
Exports metrics to SigNoz via OpenTelemetry Collector using gRPC protocol
"""
import os
import structlog
from typing import Optional
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
2026-01-09 23:14:12 +01:00
from opentelemetry.sdk.resources import Resource
# Import both gRPC and HTTP exporters
try:
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GrpcMetricExporter
GRPC_AVAILABLE = True
except ImportError:
GRPC_AVAILABLE = False
GrpcMetricExporter = None
try:
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HttpMetricExporter
HTTP_AVAILABLE = True
except ImportError:
HTTP_AVAILABLE = False
HttpMetricExporter = None
from .otel_config import OTelConfig
logger = structlog.get_logger()
def setup_otel_metrics(
service_name: str,
service_version: str = "1.0.0",
otel_endpoint: Optional[str] = None,
2026-01-09 23:14:12 +01:00
export_interval_millis: int = 60000, # Export every 60 seconds
protocol: Optional[str] = None # "grpc" or "http", defaults to grpc
) -> Optional[MeterProvider]:
"""
Setup OpenTelemetry metrics to export to SigNoz.
2026-01-09 23:14:12 +01:00
Supports both gRPC (recommended, port 4317) and HTTP (port 4318) protocols.
Default protocol is gRPC for better performance.
Args:
service_name: Name of the service (e.g., "auth-service")
service_version: Version of the service
2026-01-09 23:14:12 +01:00
otel_endpoint: Optional override for OTLP endpoint
export_interval_millis: How often to push metrics in milliseconds (default 60s)
protocol: Protocol to use ("grpc" or "http"). Defaults to "grpc"
Returns:
MeterProvider instance if successful, None otherwise
Example:
from shared.monitoring.metrics_exporter import setup_otel_metrics
2026-01-09 23:14:12 +01:00
# Setup with gRPC (default)
meter_provider = setup_otel_metrics("auth-service", "1.0.0")
2026-01-09 23:14:12 +01:00
# Or with HTTP
meter_provider = setup_otel_metrics("auth-service", "1.0.0", protocol="http")
# Create meters for your metrics
meter = meter_provider.get_meter(__name__)
request_counter = meter.create_counter(
"http.server.requests",
description="Total HTTP requests",
unit="1"
)
# Record metrics
request_counter.add(1, {"method": "GET", "status": "200"})
"""
# Check if metrics export is enabled
2026-01-09 23:14:12 +01:00
if not OTelConfig.is_enabled("metrics"):
logger.info(
"OpenTelemetry metrics export disabled",
service=service_name,
reason="ENABLE_OTEL_METRICS not set to 'true'"
)
return None
2026-01-09 23:14:12 +01:00
# Determine protocol to use
if protocol is None:
protocol = OTelConfig.get_protocol("metrics")
# Validate protocol is available
if protocol == "grpc" and not GRPC_AVAILABLE:
logger.warning(
"gRPC exporter not available, falling back to HTTP",
service=service_name
)
protocol = "http"
elif protocol == "http" and not HTTP_AVAILABLE:
logger.warning(
"HTTP exporter not available, falling back to gRPC",
service=service_name
)
2026-01-09 23:14:12 +01:00
protocol = "grpc"
2026-01-09 23:14:12 +01:00
if protocol not in ["grpc", "http"]:
logger.error(
"Invalid protocol specified",
service=service_name,
protocol=protocol
)
return None
try:
2026-01-09 23:14:12 +01:00
# Get endpoints from centralized config
endpoints = OTelConfig.get_endpoints()
# Determine which endpoint to use
if otel_endpoint:
# User provided override
if protocol == "grpc":
endpoint = OTelConfig._clean_grpc_endpoint(otel_endpoint)
else:
endpoint = OTelConfig._ensure_http_endpoint(otel_endpoint, "/v1/metrics")
else:
# Use config-determined endpoint
if protocol == "grpc":
endpoint = endpoints.metrics_grpc
else:
endpoint = endpoints.metrics_http
# Get resource attributes
resource_attrs = OTelConfig.get_resource_attributes(service_name, service_version)
resource = Resource(attributes=resource_attrs)
# Configure OTLP exporter based on protocol
if protocol == "grpc":
otlp_exporter = GrpcMetricExporter(
endpoint=endpoint,
insecure=True, # Use secure=False in production with proper TLS
timeout=10
)
else: # http
otlp_exporter = HttpMetricExporter(
endpoint=endpoint,
timeout=10
)
# Create periodic metric reader
metric_reader = PeriodicExportingMetricReader(
exporter=otlp_exporter,
export_interval_millis=export_interval_millis
)
# Configure meter provider
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
# Set global meter provider
metrics.set_meter_provider(meter_provider)
logger.info(
2026-01-09 23:14:12 +01:00
"OpenTelemetry metrics export configured successfully",
service=service_name,
2026-01-09 23:14:12 +01:00
endpoint=endpoint,
protocol=protocol,
export_interval_seconds=export_interval_millis / 1000
)
return meter_provider
except Exception as e:
logger.error(
"Failed to setup OpenTelemetry metrics export",
service=service_name,
error=str(e),
2026-01-09 23:14:12 +01:00
protocol=protocol
)
return None
class OTelMetricsCollector:
"""
Wrapper for OpenTelemetry metrics that provides a similar interface
to the Prometheus MetricsCollector.
This allows services to emit metrics that go to both Prometheus and SigNoz.
"""
def __init__(self, service_name: str, meter_provider: MeterProvider):
self.service_name = service_name
self.meter_provider = meter_provider
self.meter = meter_provider.get_meter(__name__)
# Store created instruments
self._counters = {}
self._histograms = {}
self._gauges = {}
def create_counter(self, name: str, description: str = "", unit: str = "1"):
"""Create or get an OpenTelemetry Counter"""
if name not in self._counters:
self._counters[name] = self.meter.create_counter(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=description,
unit=unit
)
return self._counters[name]
def create_histogram(self, name: str, description: str = "", unit: str = "1"):
"""Create or get an OpenTelemetry Histogram"""
if name not in self._histograms:
self._histograms[name] = self.meter.create_histogram(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=description,
unit=unit
)
return self._histograms[name]
def create_gauge(self, name: str, description: str = "", unit: str = "1"):
"""
Create or get an OpenTelemetry observable gauge.
Note: Gauges in OTEL require a callback function.
"""
if name not in self._gauges:
# Store gauge reference for callback registration
self._gauges[name] = {
"name": f"{self.service_name.replace('-', '_')}_{name}",
"description": description,
"unit": unit,
"value": 0,
"attributes": {}
}
return self._gauges[name]
def increment_counter(self, name: str, value: int = 1, attributes: dict = None):
"""Increment a counter with optional attributes"""
if name in self._counters:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._counters[name].add(value, attributes)
def observe_histogram(self, name: str, value: float, attributes: dict = None):
"""Record a histogram observation with optional attributes"""
if name in self._histograms:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._histograms[name].record(value, attributes)
def set_gauge(self, name: str, value: float, attributes: dict = None):
"""Set a gauge value (stores for next callback)"""
if name in self._gauges:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._gauges[name]["value"] = value
self._gauges[name]["attributes"] = attributes
def create_dual_metrics_collector(service_name: str, service_version: str = "1.0.0"):
"""
Create a metrics collector that exports to both Prometheus and SigNoz.
This function sets up both collection strategies:
1. Prometheus client library (for /metrics endpoint scraping)
2. OpenTelemetry metrics (for OTLP push to SigNoz)
Returns a tuple: (prometheus_collector, otel_collector)
Both collectors can be used independently or together.
Example:
from shared.monitoring.metrics_exporter import create_dual_metrics_collector
prom_collector, otel_collector = create_dual_metrics_collector("auth-service")
# Prometheus counter
prom_collector.register_counter("requests_total", "Total requests")
prom_collector.increment_counter("requests_total", labels={"status": "200"})
# OpenTelemetry counter (pushed to SigNoz)
counter = otel_collector.create_counter("requests_total", "Total requests")
counter.add(1, {"status": "200"})
"""
from shared.monitoring.metrics import MetricsCollector
# Create Prometheus collector
prom_collector = MetricsCollector(service_name)
# Create OpenTelemetry collector
meter_provider = setup_otel_metrics(service_name, service_version)
otel_collector = None
if meter_provider:
otel_collector = OTelMetricsCollector(service_name, meter_provider)
return prom_collector, otel_collector