Files
bakery-ia/shared/monitoring/tracing.py
2026-01-09 23:14:12 +01:00

228 lines
6.9 KiB
Python
Executable File

"""
OpenTelemetry distributed tracing integration
Provides end-to-end request tracking across all services
"""
import os
import structlog
from typing import Optional
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Core instrumentations (should always be available)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# Optional instrumentations (may not be installed in all services)
try:
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
try:
from opentelemetry.instrumentation.redis import RedisInstrumentor
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
try:
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
from .otel_config import OTelConfig
logger = structlog.get_logger()
def setup_tracing(
app,
service_name: str,
service_version: str = "1.0.0",
otel_endpoint: Optional[str] = None
) -> Optional[TracerProvider]:
"""
Setup OpenTelemetry distributed tracing for a FastAPI service.
Automatically instruments:
- FastAPI endpoints
- HTTPX client requests (inter-service calls)
- Redis operations
- PostgreSQL/SQLAlchemy queries
Uses gRPC protocol (port 4317) for sending traces to SigNoz.
Args:
app: FastAPI application instance
service_name: Name of the service (e.g., "auth-service")
service_version: Version of the service
otel_endpoint: Optional override for OTLP endpoint (gRPC format: host:port)
Returns:
TracerProvider instance if successful, None otherwise
Example:
from shared.monitoring.tracing import setup_tracing
app = FastAPI(title="Auth Service")
tracer_provider = setup_tracing(app, "auth-service", "1.0.0")
"""
# Check if tracing is enabled
if not OTelConfig.is_enabled("traces"):
logger.info(
"Distributed tracing disabled",
service=service_name,
reason="ENABLE_TRACING not set to 'true'"
)
return None
try:
# Get endpoints from centralized config
endpoints = OTelConfig.get_endpoints()
# Use provided endpoint or get from config
if otel_endpoint:
# Clean user-provided endpoint for gRPC
grpc_endpoint = OTelConfig._clean_grpc_endpoint(otel_endpoint)
else:
grpc_endpoint = endpoints.traces_grpc
# Get resource attributes
resource_attrs = OTelConfig.get_resource_attributes(service_name, service_version)
resource = Resource(attributes=resource_attrs)
# Configure tracer provider
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
# Configure OTLP gRPC exporter for traces
otlp_exporter = OTLPSpanExporter(
endpoint=grpc_endpoint,
insecure=True # Use secure=False in production with proper TLS
)
# Add span processor with batching for performance
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)
# Auto-instrument FastAPI
FastAPIInstrumentor.instrument_app(
app,
tracer_provider=tracer_provider,
excluded_urls="health,metrics" # Don't trace health/metrics endpoints
)
# Auto-instrument HTTPX (inter-service communication) if available
if HTTPX_AVAILABLE:
try:
HTTPXClientInstrumentor().instrument(tracer_provider=tracer_provider)
logger.debug("HTTPX instrumentation enabled")
except Exception as e:
logger.warning(f"Failed to instrument HTTPX: {e}")
# Auto-instrument Redis if available
if REDIS_AVAILABLE:
try:
RedisInstrumentor().instrument(tracer_provider=tracer_provider)
logger.debug("Redis instrumentation enabled")
except Exception as e:
logger.warning(f"Failed to instrument Redis: {e}")
# Auto-instrument SQLAlchemy if available
if SQLALCHEMY_AVAILABLE:
try:
SQLAlchemyInstrumentor().instrument(tracer_provider=tracer_provider)
logger.debug("SQLAlchemy instrumentation enabled")
except Exception as e:
logger.warning(f"Failed to instrument SQLAlchemy: {e}")
logger.info(
"Distributed tracing configured successfully",
service=service_name,
grpc_endpoint=grpc_endpoint,
protocol="grpc"
)
return tracer_provider
except Exception as e:
logger.error(
"Failed to setup tracing - continuing without it",
service=service_name,
error=str(e)
)
return None
def get_current_trace_id() -> Optional[str]:
"""
Get the current trace ID for correlation with logs.
Returns:
Trace ID as hex string, or None if no active trace
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
return format(span.get_span_context().trace_id, '032x')
return None
def get_current_span_id() -> Optional[str]:
"""
Get the current span ID.
Returns:
Span ID as hex string, or None if no active span
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
return format(span.get_span_context().span_id, '016x')
return None
def add_trace_attributes(**attributes):
"""
Add custom attributes to the current span.
Example:
add_trace_attributes(
user_id="123",
tenant_id="abc",
operation="user_registration"
)
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
for key, value in attributes.items():
span.set_attribute(key, str(value))
def add_trace_event(name: str, **attributes):
"""
Add an event to the current span (for important operations).
Example:
add_trace_event("user_authenticated", user_id="123", method="jwt")
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
span.add_event(name, attributes)
def record_exception(exception: Exception):
"""
Record an exception in the current span.
Args:
exception: The exception to record
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))