Files
bakery-ia/shared/monitoring/tracing.py
2025-10-15 16:12:49 +02:00

180 lines
5.5 KiB
Python

"""
OpenTelemetry distributed tracing integration
Provides end-to-end request tracking across all services
"""
import structlog
from typing import Optional
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
logger = structlog.get_logger()
def setup_tracing(
app,
service_name: str,
service_version: str = "1.0.0",
jaeger_endpoint: str = "http://jaeger-collector.monitoring:4317"
):
"""
Setup OpenTelemetry distributed tracing for a FastAPI service.
Automatically instruments:
- FastAPI endpoints
- HTTPX client requests (inter-service calls)
- Redis operations
- PostgreSQL/SQLAlchemy queries
Args:
app: FastAPI application instance
service_name: Name of the service (e.g., "auth-service")
service_version: Version of the service
jaeger_endpoint: Jaeger collector gRPC endpoint
Example:
from shared.monitoring.tracing import setup_tracing
app = FastAPI(title="Auth Service")
setup_tracing(app, "auth-service")
"""
try:
# Create resource with service information
resource = Resource(attributes={
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
"deployment.environment": "production"
})
# Configure tracer provider
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
# Configure OTLP exporter to send to Jaeger
otlp_exporter = OTLPSpanExporter(
endpoint=jaeger_endpoint,
insecure=True # Use TLS in production
)
# Add span processor with batching for performance
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)
# Auto-instrument FastAPI
FastAPIInstrumentor.instrument_app(
app,
tracer_provider=tracer_provider,
excluded_urls="health,metrics" # Don't trace health/metrics endpoints
)
# Auto-instrument HTTPX (inter-service communication)
HTTPXClientInstrumentor().instrument(tracer_provider=tracer_provider)
# Auto-instrument Redis
try:
RedisInstrumentor().instrument(tracer_provider=tracer_provider)
except Exception as e:
logger.warning(f"Failed to instrument Redis: {e}")
# Auto-instrument PostgreSQL (psycopg2) - skip if not available
# Most services use asyncpg instead of psycopg2
# try:
# Psycopg2Instrumentor().instrument(tracer_provider=tracer_provider)
# except Exception as e:
# logger.warning(f"Failed to instrument Psycopg2: {e}")
# Auto-instrument SQLAlchemy
try:
SQLAlchemyInstrumentor().instrument(tracer_provider=tracer_provider)
except Exception as e:
logger.warning(f"Failed to instrument SQLAlchemy: {e}")
logger.info(
"Distributed tracing configured",
service=service_name,
jaeger_endpoint=jaeger_endpoint
)
except Exception as e:
logger.error(
"Failed to setup tracing - continuing without it",
service=service_name,
error=str(e)
)
def get_current_trace_id() -> Optional[str]:
"""
Get the current trace ID for correlation with logs.
Returns:
Trace ID as hex string, or None if no active trace
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
return format(span.get_span_context().trace_id, '032x')
return None
def get_current_span_id() -> Optional[str]:
"""
Get the current span ID.
Returns:
Span ID as hex string, or None if no active span
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
return format(span.get_span_context().span_id, '016x')
return None
def add_trace_attributes(**attributes):
"""
Add custom attributes to the current span.
Example:
add_trace_attributes(
user_id="123",
tenant_id="abc",
operation="user_registration"
)
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
for key, value in attributes.items():
span.set_attribute(key, str(value))
def add_trace_event(name: str, **attributes):
"""
Add an event to the current span (for important operations).
Example:
add_trace_event("user_authenticated", user_id="123", method="jwt")
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
span.add_event(name, attributes)
def record_exception(exception: Exception):
"""
Record an exception in the current span.
Args:
exception: The exception to record
"""
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))