Files
bakery-ia/shared/monitoring/alert_metrics.py
2025-08-23 10:19:58 +02:00

420 lines
14 KiB
Python

# shared/monitoring/alert_metrics.py
"""
Metrics and monitoring for the alert and recommendation system
Provides comprehensive metrics for tracking system performance and effectiveness
"""
from prometheus_client import Counter, Histogram, Gauge, Summary, Info
from typing import Dict, Any
import time
from functools import wraps
import structlog
logger = structlog.get_logger()
# =================================================================
# DETECTION METRICS
# =================================================================
# Alert and recommendation generation
items_published = Counter(
'alert_items_published_total',
'Total number of alerts and recommendations published',
['service', 'item_type', 'severity', 'type']
)
item_checks_performed = Counter(
'alert_checks_performed_total',
'Total number of alert checks performed',
['service', 'check_type', 'pattern']
)
item_check_duration = Histogram(
'alert_check_duration_seconds',
'Time taken to perform alert checks',
['service', 'check_type'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)
alert_detection_errors = Counter(
'alert_detection_errors_total',
'Total number of errors during alert detection',
['service', 'error_type', 'check_type']
)
# Deduplication metrics
duplicate_items_prevented = Counter(
'duplicate_items_prevented_total',
'Number of duplicate alerts/recommendations prevented',
['service', 'item_type', 'type']
)
# =================================================================
# PROCESSING METRICS
# =================================================================
# Alert processor metrics
items_processed = Counter(
'alert_items_processed_total',
'Total number of items processed by alert processor',
['item_type', 'severity', 'type', 'status']
)
item_processing_duration = Histogram(
'alert_processing_duration_seconds',
'Time taken to process alerts/recommendations',
['item_type', 'severity'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5]
)
database_storage_duration = Histogram(
'alert_database_storage_duration_seconds',
'Time taken to store items in database',
buckets=[0.01, 0.05, 0.1, 0.5, 1]
)
processing_errors = Counter(
'alert_processing_errors_total',
'Total number of processing errors',
['error_type', 'item_type']
)
# =================================================================
# DELIVERY METRICS
# =================================================================
# Notification delivery
notifications_sent = Counter(
'alert_notifications_sent_total',
'Total notifications sent through all channels',
['channel', 'item_type', 'severity', 'status']
)
notification_delivery_duration = Histogram(
'alert_notification_delivery_duration_seconds',
'Time from item generation to delivery',
['item_type', 'severity', 'channel'],
buckets=[0.1, 0.5, 1, 5, 10, 30, 60]
)
delivery_failures = Counter(
'alert_delivery_failures_total',
'Failed notification deliveries',
['channel', 'item_type', 'error_type']
)
# Channel-specific metrics
email_notifications = Counter(
'alert_email_notifications_total',
'Email notifications sent',
['status', 'item_type']
)
whatsapp_notifications = Counter(
'alert_whatsapp_notifications_total',
'WhatsApp notifications sent',
['status', 'item_type']
)
sse_events_sent = Counter(
'alert_sse_events_sent_total',
'SSE events sent to dashboard',
['tenant', 'event_type', 'item_type']
)
# =================================================================
# SSE METRICS
# =================================================================
# SSE connection metrics
sse_active_connections = Gauge(
'alert_sse_active_connections',
'Number of active SSE connections',
['tenant_id']
)
sse_connection_duration = Histogram(
'alert_sse_connection_duration_seconds',
'Duration of SSE connections',
buckets=[10, 30, 60, 300, 600, 1800, 3600]
)
sse_message_queue_size = Gauge(
'alert_sse_message_queue_size',
'Current size of SSE message queues',
['tenant_id']
)
sse_connection_errors = Counter(
'alert_sse_connection_errors_total',
'SSE connection errors',
['error_type', 'tenant_id']
)
# =================================================================
# SYSTEM HEALTH METRICS
# =================================================================
# Active items gauge
active_items_gauge = Gauge(
'alert_active_items_current',
'Current number of active alerts and recommendations',
['tenant_id', 'item_type', 'severity']
)
# System component health
system_component_health = Gauge(
'alert_system_component_health',
'Health status of alert system components (1=healthy, 0=unhealthy)',
['component', 'service']
)
# Leader election status
scheduler_leader_status = Gauge(
'alert_scheduler_leader_status',
'Leader election status for schedulers (1=leader, 0=follower)',
['service']
)
# Message queue health
rabbitmq_connection_status = Gauge(
'alert_rabbitmq_connection_status',
'RabbitMQ connection status (1=connected, 0=disconnected)',
['service']
)
redis_connection_status = Gauge(
'alert_redis_connection_status',
'Redis connection status (1=connected, 0=disconnected)',
['service']
)
# =================================================================
# BUSINESS METRICS
# =================================================================
# Alert response metrics
items_acknowledged = Counter(
'alert_items_acknowledged_total',
'Number of items acknowledged by users',
['item_type', 'severity', 'service']
)
items_resolved = Counter(
'alert_items_resolved_total',
'Number of items resolved by users',
['item_type', 'severity', 'service']
)
item_response_time = Histogram(
'alert_item_response_time_seconds',
'Time from item creation to acknowledgment',
['item_type', 'severity'],
buckets=[60, 300, 600, 1800, 3600, 7200, 14400]
)
# Recommendation adoption
recommendations_implemented = Counter(
'alert_recommendations_implemented_total',
'Number of recommendations marked as implemented',
['type', 'service']
)
# Effectiveness metrics
false_positive_rate = Gauge(
'alert_false_positive_rate',
'Rate of false positive alerts',
['service', 'alert_type']
)
# =================================================================
# PERFORMANCE DECORATORS
# =================================================================
def track_duration(metric: Histogram, **labels):
"""Decorator to track function execution time"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
metric.labels(**labels).observe(time.time() - start_time)
return result
except Exception as e:
# Track error duration too
metric.labels(**labels).observe(time.time() - start_time)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
metric.labels(**labels).observe(time.time() - start_time)
return result
except Exception as e:
metric.labels(**labels).observe(time.time() - start_time)
raise
return async_wrapper if hasattr(func, '__code__') and func.__code__.co_flags & 0x80 else sync_wrapper
return decorator
def track_errors(error_counter: Counter, **labels):
"""Decorator to track errors in functions"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
error_counter.labels(error_type=type(e).__name__, **labels).inc()
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_counter.labels(error_type=type(e).__name__, **labels).inc()
raise
return async_wrapper if hasattr(func, '__code__') and func.__code__.co_flags & 0x80 else sync_wrapper
return decorator
# =================================================================
# UTILITY FUNCTIONS
# =================================================================
def record_item_published(service: str, item_type: str, severity: str, alert_type: str):
"""Record that an item was published"""
items_published.labels(
service=service,
item_type=item_type,
severity=severity,
type=alert_type
).inc()
def record_item_processed(item_type: str, severity: str, alert_type: str, status: str):
"""Record that an item was processed"""
items_processed.labels(
item_type=item_type,
severity=severity,
type=alert_type,
status=status
).inc()
def record_notification_sent(channel: str, item_type: str, severity: str, status: str):
"""Record notification delivery"""
notifications_sent.labels(
channel=channel,
item_type=item_type,
severity=severity,
status=status
).inc()
def update_active_items(tenant_id: str, item_type: str, severity: str, count: int):
"""Update active items gauge"""
active_items_gauge.labels(
tenant_id=tenant_id,
item_type=item_type,
severity=severity
).set(count)
def update_component_health(component: str, service: str, is_healthy: bool):
"""Update component health status"""
system_component_health.labels(
component=component,
service=service
).set(1 if is_healthy else 0)
def update_connection_status(connection_type: str, service: str, is_connected: bool):
"""Update connection status"""
if connection_type == 'rabbitmq':
rabbitmq_connection_status.labels(service=service).set(1 if is_connected else 0)
elif connection_type == 'redis':
redis_connection_status.labels(service=service).set(1 if is_connected else 0)
# =================================================================
# METRICS AGGREGATOR
# =================================================================
class AlertMetricsCollector:
"""Centralized metrics collector for alert system"""
def __init__(self, service_name: str):
self.service_name = service_name
def record_check_performed(self, check_type: str, pattern: str):
"""Record that a check was performed"""
item_checks_performed.labels(
service=self.service_name,
check_type=check_type,
pattern=pattern
).inc()
def record_detection_error(self, error_type: str, check_type: str):
"""Record detection error"""
alert_detection_errors.labels(
service=self.service_name,
error_type=error_type,
check_type=check_type
).inc()
def record_duplicate_prevented(self, item_type: str, alert_type: str):
"""Record prevented duplicate"""
duplicate_items_prevented.labels(
service=self.service_name,
item_type=item_type,
type=alert_type
).inc()
def update_leader_status(self, is_leader: bool):
"""Update leader election status"""
scheduler_leader_status.labels(service=self.service_name).set(1 if is_leader else 0)
def get_service_metrics(self) -> Dict[str, Any]:
"""Get all metrics for this service"""
return {
'service': self.service_name,
'items_published': items_published._value._value,
'checks_performed': item_checks_performed._value._value,
'detection_errors': alert_detection_errors._value._value,
'duplicates_prevented': duplicate_items_prevented._value._value
}
# =================================================================
# DASHBOARD METRICS
# =================================================================
def get_system_overview_metrics() -> Dict[str, Any]:
"""Get overview metrics for monitoring dashboard"""
try:
return {
'total_items_published': sum(items_published._value._value.values()),
'total_checks_performed': sum(item_checks_performed._value._value.values()),
'total_notifications_sent': sum(notifications_sent._value._value.values()),
'active_sse_connections': sum(sse_active_connections._value._value.values()),
'processing_errors': sum(processing_errors._value._value.values()),
'delivery_failures': sum(delivery_failures._value._value.values()),
'timestamp': time.time()
}
except Exception as e:
logger.error("Error collecting overview metrics", error=str(e))
return {'error': str(e), 'timestamp': time.time()}
def get_tenant_metrics(tenant_id: str) -> Dict[str, Any]:
"""Get metrics for a specific tenant"""
try:
return {
'tenant_id': tenant_id,
'active_connections': sse_active_connections.labels(tenant_id=tenant_id)._value._value,
'events_sent': sum([
v for k, v in sse_events_sent._value._value.items()
if k[0] == tenant_id
]),
'timestamp': time.time()
}
except Exception as e:
logger.error("Error collecting tenant metrics", tenant_id=tenant_id, error=str(e))
return {'tenant_id': tenant_id, 'error': str(e), 'timestamp': time.time()}