Files
bakery-ia/shared/monitoring/logs_exporter.py

222 lines
6.9 KiB
Python
Raw Permalink Normal View History

"""
OpenTelemetry Logs Integration for SigNoz
2026-01-09 23:14:12 +01:00
Exports structured logs to SigNoz via OpenTelemetry Collector using HTTP protocol
"""
import os
import logging
import structlog
from typing import Optional
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
2026-01-09 23:14:12 +01:00
from opentelemetry.sdk.resources import Resource
# Try to import HTTP log exporter (logs always use HTTP)
try:
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
2026-01-09 23:14:12 +01:00
HTTP_LOG_EXPORTER_AVAILABLE = True
except ImportError:
try:
from opentelemetry.exporter.otlp.proto.http.log_exporter import OTLPLogExporter
2026-01-09 23:14:12 +01:00
HTTP_LOG_EXPORTER_AVAILABLE = True
except ImportError:
OTLPLogExporter = None
2026-01-09 23:14:12 +01:00
HTTP_LOG_EXPORTER_AVAILABLE = False
from .otel_config import OTelConfig
logger = structlog.get_logger()
def setup_otel_logging(
service_name: str,
service_version: str = "1.0.0",
otel_endpoint: Optional[str] = None,
enable_console: bool = True
) -> Optional[LoggingHandler]:
"""
Setup OpenTelemetry logging to export logs to SigNoz.
2026-01-09 23:14:12 +01:00
Uses HTTP protocol (port 4318) for sending logs to SigNoz.
Integrates with Python's standard logging to automatically export
all log records to SigNoz via the OTLP HTTP protocol.
Args:
service_name: Name of the service (e.g., "auth-service")
service_version: Version of the service
2026-01-09 23:14:12 +01:00
otel_endpoint: Optional override for OTLP endpoint (HTTP format with path)
enable_console: Whether to also log to console (default: True)
Returns:
LoggingHandler instance if successful, None otherwise
Example:
from shared.monitoring.logs_exporter import setup_otel_logging
# Setup during service initialization
2026-01-09 23:14:12 +01:00
handler = setup_otel_logging("auth-service", "1.0.0")
# Now all standard logging calls will be exported to SigNoz
import logging
logger = logging.getLogger(__name__)
logger.info("This will appear in SigNoz!")
"""
# Check if logging export is enabled
2026-01-09 23:14:12 +01:00
if not OTelConfig.is_enabled("logs"):
logger.info(
"OpenTelemetry logs export disabled",
service=service_name,
reason="OTEL_LOGS_EXPORTER not set to 'otlp'"
)
return None
2026-01-09 23:14:12 +01:00
# Check if HTTP log exporter is available
if not HTTP_LOG_EXPORTER_AVAILABLE or OTLPLogExporter is None:
logger.warning(
"OpenTelemetry HTTP log exporter not available",
service=service_name,
reason="opentelemetry-exporter-otlp-proto-http package not installed"
)
2026-01-09 23:14:12 +01:00
return None
try:
2026-01-09 23:14:12 +01:00
# Get endpoints from centralized config
endpoints = OTelConfig.get_endpoints()
# Use provided endpoint or get from config
if otel_endpoint:
http_endpoint = OTelConfig._ensure_http_endpoint(otel_endpoint, "/v1/logs")
else:
http_endpoint = endpoints.logs_http
# Get resource attributes
resource_attrs = OTelConfig.get_resource_attributes(service_name, service_version)
resource = Resource(attributes=resource_attrs)
# Configure logger provider
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)
2026-01-09 23:14:12 +01:00
# Configure OTLP HTTP exporter for logs
otlp_exporter = OTLPLogExporter(
2026-01-09 23:14:12 +01:00
endpoint=http_endpoint,
timeout=10
)
# Add log record processor with batching
log_processor = BatchLogRecordProcessor(otlp_exporter)
logger_provider.add_log_record_processor(log_processor)
# Create logging handler that bridges standard logging to OpenTelemetry
otel_handler = LoggingHandler(
level=logging.NOTSET, # Capture all levels
logger_provider=logger_provider
)
# Add handler to root logger
root_logger = logging.getLogger()
root_logger.addHandler(otel_handler)
logger.info(
2026-01-09 23:14:12 +01:00
"OpenTelemetry logs export configured successfully",
service=service_name,
2026-01-09 23:14:12 +01:00
http_endpoint=http_endpoint,
protocol="http",
console_logging=enable_console
)
return otel_handler
except Exception as e:
logger.error(
"Failed to setup OpenTelemetry logs export",
service=service_name,
2026-01-09 23:14:12 +01:00
error=str(e)
)
return None
def add_log_context(**context):
"""
Add contextual information to logs that will be sent to SigNoz.
This is useful for adding request IDs, user IDs, tenant IDs, etc.
that help with filtering and correlation in SigNoz.
Args:
**context: Key-value pairs to add to log context
Example:
from shared.monitoring.logs_exporter import add_log_context
# Add context for current request
add_log_context(
request_id="req_123",
user_id="user_456",
tenant_id="tenant_789"
)
# Now all logs will include this context
logger.info("Processing order") # Will include request_id, user_id, tenant_id
"""
# This works with structlog's context binding
bound_logger = structlog.get_logger()
return bound_logger.bind(**context)
def get_current_trace_context() -> dict:
"""
Get current trace context for log correlation.
Returns a dict with trace_id and span_id if available,
which can be added to log records for correlation with traces.
Returns:
Dict with trace_id and span_id, or empty dict if no active trace
Example:
from shared.monitoring.logs_exporter import get_current_trace_context
# Get trace context and add to logs
trace_ctx = get_current_trace_context()
logger.info("Processing request", **trace_ctx)
"""
from opentelemetry import trace
span = trace.get_current_span()
if span and span.get_span_context().is_valid:
return {
"trace_id": format(span.get_span_context().trace_id, '032x'),
"span_id": format(span.get_span_context().span_id, '016x'),
}
return {}
class StructlogOTELProcessor:
"""
Structlog processor that adds OpenTelemetry trace context to logs.
This automatically adds trace_id and span_id to all log records,
enabling correlation between logs and traces in SigNoz.
Usage:
import structlog
from shared.monitoring.logs_exporter import StructlogOTELProcessor
structlog.configure(
processors=[
StructlogOTELProcessor(),
# ... other processors
]
)
"""
def __call__(self, logger, method_name, event_dict):
"""Add trace context to log event"""
trace_ctx = get_current_trace_context()
if trace_ctx:
event_dict.update(trace_ctx)
return event_dict