Files
bakery-ia/shared/monitoring/metrics_exporter.py
2026-01-09 07:26:11 +01:00

251 lines
8.9 KiB
Python

"""
OpenTelemetry Metrics Integration for SigNoz
Exports metrics to SigNoz via OpenTelemetry Collector in addition to Prometheus
"""
import os
import structlog
from typing import Optional
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
logger = structlog.get_logger()
def setup_otel_metrics(
service_name: str,
service_version: str = "1.0.0",
otel_endpoint: Optional[str] = None,
export_interval_millis: int = 60000 # Export every 60 seconds
) -> Optional[MeterProvider]:
"""
Setup OpenTelemetry metrics to export to SigNoz.
This creates a dual-export strategy:
- Prometheus exposition format at /metrics (for Prometheus scraping)
- OTLP push to SigNoz collector (for direct ingestion)
Args:
service_name: Name of the service (e.g., "auth-service")
service_version: Version of the service
otel_endpoint: OpenTelemetry collector endpoint (default from env)
export_interval_millis: How often to push metrics (default 60s)
Returns:
MeterProvider instance if successful, None otherwise
Example:
from shared.monitoring.metrics_exporter import setup_otel_metrics
# Setup during service initialization
meter_provider = setup_otel_metrics("auth-service", "1.0.0")
# Create meters for your metrics
meter = meter_provider.get_meter(__name__)
request_counter = meter.create_counter(
"http.server.requests",
description="Total HTTP requests",
unit="1"
)
# Record metrics
request_counter.add(1, {"method": "GET", "status": "200"})
"""
# Check if metrics export is enabled
enable_otel_metrics = os.getenv("ENABLE_OTEL_METRICS", "true").lower() == "true"
if not enable_otel_metrics:
logger.info(
"OpenTelemetry metrics export disabled",
service=service_name,
reason="ENABLE_OTEL_METRICS not set to 'true'"
)
return None
# Get OTLP endpoint from environment or parameter
if otel_endpoint is None:
otel_endpoint = os.getenv(
"OTEL_EXPORTER_OTLP_ENDPOINT",
os.getenv("OTEL_COLLECTOR_ENDPOINT", "http://signoz-otel-collector.bakery-ia:4318")
)
# Ensure endpoint has /v1/metrics path for HTTP
if not otel_endpoint.endswith("/v1/metrics"):
otel_endpoint = f"{otel_endpoint}/v1/metrics"
try:
# Create resource with service information
resource = Resource(attributes={
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
"deployment.environment": os.getenv("ENVIRONMENT", "development"),
"k8s.namespace.name": os.getenv("K8S_NAMESPACE", "bakery-ia"),
"k8s.pod.name": os.getenv("HOSTNAME", "unknown"),
})
# Configure OTLP exporter for metrics
otlp_exporter = OTLPMetricExporter(
endpoint=otel_endpoint,
timeout=10
)
# Create periodic metric reader
metric_reader = PeriodicExportingMetricReader(
exporter=otlp_exporter,
export_interval_millis=export_interval_millis
)
# Configure meter provider
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
# Set global meter provider
metrics.set_meter_provider(meter_provider)
logger.info(
"OpenTelemetry metrics export configured",
service=service_name,
otel_endpoint=otel_endpoint,
export_interval_seconds=export_interval_millis / 1000
)
return meter_provider
except Exception as e:
logger.error(
"Failed to setup OpenTelemetry metrics export",
service=service_name,
error=str(e),
reason="Will continue with Prometheus-only metrics"
)
return None
class OTelMetricsCollector:
"""
Wrapper for OpenTelemetry metrics that provides a similar interface
to the Prometheus MetricsCollector.
This allows services to emit metrics that go to both Prometheus and SigNoz.
"""
def __init__(self, service_name: str, meter_provider: MeterProvider):
self.service_name = service_name
self.meter_provider = meter_provider
self.meter = meter_provider.get_meter(__name__)
# Store created instruments
self._counters = {}
self._histograms = {}
self._gauges = {}
def create_counter(self, name: str, description: str = "", unit: str = "1"):
"""Create or get an OpenTelemetry Counter"""
if name not in self._counters:
self._counters[name] = self.meter.create_counter(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=description,
unit=unit
)
return self._counters[name]
def create_histogram(self, name: str, description: str = "", unit: str = "1"):
"""Create or get an OpenTelemetry Histogram"""
if name not in self._histograms:
self._histograms[name] = self.meter.create_histogram(
name=f"{self.service_name.replace('-', '_')}_{name}",
description=description,
unit=unit
)
return self._histograms[name]
def create_gauge(self, name: str, description: str = "", unit: str = "1"):
"""
Create or get an OpenTelemetry observable gauge.
Note: Gauges in OTEL require a callback function.
"""
if name not in self._gauges:
# Store gauge reference for callback registration
self._gauges[name] = {
"name": f"{self.service_name.replace('-', '_')}_{name}",
"description": description,
"unit": unit,
"value": 0,
"attributes": {}
}
return self._gauges[name]
def increment_counter(self, name: str, value: int = 1, attributes: dict = None):
"""Increment a counter with optional attributes"""
if name in self._counters:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._counters[name].add(value, attributes)
def observe_histogram(self, name: str, value: float, attributes: dict = None):
"""Record a histogram observation with optional attributes"""
if name in self._histograms:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._histograms[name].record(value, attributes)
def set_gauge(self, name: str, value: float, attributes: dict = None):
"""Set a gauge value (stores for next callback)"""
if name in self._gauges:
if attributes is None:
attributes = {"service": self.service_name}
elif "service" not in attributes:
attributes["service"] = self.service_name
self._gauges[name]["value"] = value
self._gauges[name]["attributes"] = attributes
def create_dual_metrics_collector(service_name: str, service_version: str = "1.0.0"):
"""
Create a metrics collector that exports to both Prometheus and SigNoz.
This function sets up both collection strategies:
1. Prometheus client library (for /metrics endpoint scraping)
2. OpenTelemetry metrics (for OTLP push to SigNoz)
Returns a tuple: (prometheus_collector, otel_collector)
Both collectors can be used independently or together.
Example:
from shared.monitoring.metrics_exporter import create_dual_metrics_collector
prom_collector, otel_collector = create_dual_metrics_collector("auth-service")
# Prometheus counter
prom_collector.register_counter("requests_total", "Total requests")
prom_collector.increment_counter("requests_total", labels={"status": "200"})
# OpenTelemetry counter (pushed to SigNoz)
counter = otel_collector.create_counter("requests_total", "Total requests")
counter.add(1, {"status": "200"})
"""
from shared.monitoring.metrics import MetricsCollector
# Create Prometheus collector
prom_collector = MetricsCollector(service_name)
# Create OpenTelemetry collector
meter_provider = setup_otel_metrics(service_name, service_version)
otel_collector = None
if meter_provider:
otel_collector = OTelMetricsCollector(service_name, meter_provider)
return prom_collector, otel_collector