# shared/monitoring/alert_metrics.py """ Metrics and monitoring for the alert and recommendation system Provides comprehensive metrics for tracking system performance and effectiveness """ from prometheus_client import Counter, Histogram, Gauge, Summary, Info from typing import Dict, Any import time from functools import wraps import structlog logger = structlog.get_logger() # ================================================================= # DETECTION METRICS # ================================================================= # Alert and recommendation generation items_published = Counter( 'alert_items_published_total', 'Total number of alerts and recommendations published', ['service', 'item_type', 'severity', 'type'] ) item_checks_performed = Counter( 'alert_checks_performed_total', 'Total number of alert checks performed', ['service', 'check_type', 'pattern'] ) item_check_duration = Histogram( 'alert_check_duration_seconds', 'Time taken to perform alert checks', ['service', 'check_type'], buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60] ) alert_detection_errors = Counter( 'alert_detection_errors_total', 'Total number of errors during alert detection', ['service', 'error_type', 'check_type'] ) # Deduplication metrics duplicate_items_prevented = Counter( 'duplicate_items_prevented_total', 'Number of duplicate alerts/recommendations prevented', ['service', 'item_type', 'type'] ) # ================================================================= # PROCESSING METRICS # ================================================================= # Alert processor metrics items_processed = Counter( 'alert_items_processed_total', 'Total number of items processed by alert processor', ['item_type', 'severity', 'type', 'status'] ) item_processing_duration = Histogram( 'alert_processing_duration_seconds', 'Time taken to process alerts/recommendations', ['item_type', 'severity'], buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5] ) database_storage_duration = Histogram( 'alert_database_storage_duration_seconds', 'Time taken to store items in database', buckets=[0.01, 0.05, 0.1, 0.5, 1] ) processing_errors = Counter( 'alert_processing_errors_total', 'Total number of processing errors', ['error_type', 'item_type'] ) # ================================================================= # DELIVERY METRICS # ================================================================= # Notification delivery notifications_sent = Counter( 'alert_notifications_sent_total', 'Total notifications sent through all channels', ['channel', 'item_type', 'severity', 'status'] ) notification_delivery_duration = Histogram( 'alert_notification_delivery_duration_seconds', 'Time from item generation to delivery', ['item_type', 'severity', 'channel'], buckets=[0.1, 0.5, 1, 5, 10, 30, 60] ) delivery_failures = Counter( 'alert_delivery_failures_total', 'Failed notification deliveries', ['channel', 'item_type', 'error_type'] ) # Channel-specific metrics email_notifications = Counter( 'alert_email_notifications_total', 'Email notifications sent', ['status', 'item_type'] ) whatsapp_notifications = Counter( 'alert_whatsapp_notifications_total', 'WhatsApp notifications sent', ['status', 'item_type'] ) sse_events_sent = Counter( 'alert_sse_events_sent_total', 'SSE events sent to dashboard', ['tenant', 'event_type', 'item_type'] ) # ================================================================= # SSE METRICS # ================================================================= # SSE connection metrics sse_active_connections = Gauge( 'alert_sse_active_connections', 'Number of active SSE connections', ['tenant_id'] ) sse_connection_duration = Histogram( 'alert_sse_connection_duration_seconds', 'Duration of SSE connections', buckets=[10, 30, 60, 300, 600, 1800, 3600] ) sse_message_queue_size = Gauge( 'alert_sse_message_queue_size', 'Current size of SSE message queues', ['tenant_id'] ) sse_connection_errors = Counter( 'alert_sse_connection_errors_total', 'SSE connection errors', ['error_type', 'tenant_id'] ) # ================================================================= # SYSTEM HEALTH METRICS # ================================================================= # Active items gauge active_items_gauge = Gauge( 'alert_active_items_current', 'Current number of active alerts and recommendations', ['tenant_id', 'item_type', 'severity'] ) # System component health system_component_health = Gauge( 'alert_system_component_health', 'Health status of alert system components (1=healthy, 0=unhealthy)', ['component', 'service'] ) # Leader election status scheduler_leader_status = Gauge( 'alert_scheduler_leader_status', 'Leader election status for schedulers (1=leader, 0=follower)', ['service'] ) # Message queue health rabbitmq_connection_status = Gauge( 'alert_rabbitmq_connection_status', 'RabbitMQ connection status (1=connected, 0=disconnected)', ['service'] ) redis_connection_status = Gauge( 'alert_redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['service'] ) # ================================================================= # BUSINESS METRICS # ================================================================= # Alert response metrics items_acknowledged = Counter( 'alert_items_acknowledged_total', 'Number of items acknowledged by users', ['item_type', 'severity', 'service'] ) items_resolved = Counter( 'alert_items_resolved_total', 'Number of items resolved by users', ['item_type', 'severity', 'service'] ) item_response_time = Histogram( 'alert_item_response_time_seconds', 'Time from item creation to acknowledgment', ['item_type', 'severity'], buckets=[60, 300, 600, 1800, 3600, 7200, 14400] ) # Recommendation adoption recommendations_implemented = Counter( 'alert_recommendations_implemented_total', 'Number of recommendations marked as implemented', ['type', 'service'] ) # Effectiveness metrics false_positive_rate = Gauge( 'alert_false_positive_rate', 'Rate of false positive alerts', ['service', 'alert_type'] ) # ================================================================= # PERFORMANCE DECORATORS # ================================================================= def track_duration(metric: Histogram, **labels): """Decorator to track function execution time""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) metric.labels(**labels).observe(time.time() - start_time) return result except Exception as e: # Track error duration too metric.labels(**labels).observe(time.time() - start_time) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) metric.labels(**labels).observe(time.time() - start_time) return result except Exception as e: metric.labels(**labels).observe(time.time() - start_time) raise return async_wrapper if hasattr(func, '__code__') and func.__code__.co_flags & 0x80 else sync_wrapper return decorator def track_errors(error_counter: Counter, **labels): """Decorator to track errors in functions""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: error_counter.labels(error_type=type(e).__name__, **labels).inc() raise @wraps(func) def sync_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: error_counter.labels(error_type=type(e).__name__, **labels).inc() raise return async_wrapper if hasattr(func, '__code__') and func.__code__.co_flags & 0x80 else sync_wrapper return decorator # ================================================================= # UTILITY FUNCTIONS # ================================================================= def record_item_published(service: str, item_type: str, severity: str, alert_type: str): """Record that an item was published""" items_published.labels( service=service, item_type=item_type, severity=severity, type=alert_type ).inc() def record_item_processed(item_type: str, severity: str, alert_type: str, status: str): """Record that an item was processed""" items_processed.labels( item_type=item_type, severity=severity, type=alert_type, status=status ).inc() def record_notification_sent(channel: str, item_type: str, severity: str, status: str): """Record notification delivery""" notifications_sent.labels( channel=channel, item_type=item_type, severity=severity, status=status ).inc() def update_active_items(tenant_id: str, item_type: str, severity: str, count: int): """Update active items gauge""" active_items_gauge.labels( tenant_id=tenant_id, item_type=item_type, severity=severity ).set(count) def update_component_health(component: str, service: str, is_healthy: bool): """Update component health status""" system_component_health.labels( component=component, service=service ).set(1 if is_healthy else 0) def update_connection_status(connection_type: str, service: str, is_connected: bool): """Update connection status""" if connection_type == 'rabbitmq': rabbitmq_connection_status.labels(service=service).set(1 if is_connected else 0) elif connection_type == 'redis': redis_connection_status.labels(service=service).set(1 if is_connected else 0) # ================================================================= # METRICS AGGREGATOR # ================================================================= class AlertMetricsCollector: """Centralized metrics collector for alert system""" def __init__(self, service_name: str): self.service_name = service_name def record_check_performed(self, check_type: str, pattern: str): """Record that a check was performed""" item_checks_performed.labels( service=self.service_name, check_type=check_type, pattern=pattern ).inc() def record_detection_error(self, error_type: str, check_type: str): """Record detection error""" alert_detection_errors.labels( service=self.service_name, error_type=error_type, check_type=check_type ).inc() def record_duplicate_prevented(self, item_type: str, alert_type: str): """Record prevented duplicate""" duplicate_items_prevented.labels( service=self.service_name, item_type=item_type, type=alert_type ).inc() def update_leader_status(self, is_leader: bool): """Update leader election status""" scheduler_leader_status.labels(service=self.service_name).set(1 if is_leader else 0) def get_service_metrics(self) -> Dict[str, Any]: """Get all metrics for this service""" return { 'service': self.service_name, 'items_published': items_published._value._value, 'checks_performed': item_checks_performed._value._value, 'detection_errors': alert_detection_errors._value._value, 'duplicates_prevented': duplicate_items_prevented._value._value } # ================================================================= # DASHBOARD METRICS # ================================================================= def get_system_overview_metrics() -> Dict[str, Any]: """Get overview metrics for monitoring dashboard""" try: return { 'total_items_published': sum(items_published._value._value.values()), 'total_checks_performed': sum(item_checks_performed._value._value.values()), 'total_notifications_sent': sum(notifications_sent._value._value.values()), 'active_sse_connections': sum(sse_active_connections._value._value.values()), 'processing_errors': sum(processing_errors._value._value.values()), 'delivery_failures': sum(delivery_failures._value._value.values()), 'timestamp': time.time() } except Exception as e: logger.error("Error collecting overview metrics", error=str(e)) return {'error': str(e), 'timestamp': time.time()} def get_tenant_metrics(tenant_id: str) -> Dict[str, Any]: """Get metrics for a specific tenant""" try: return { 'tenant_id': tenant_id, 'active_connections': sse_active_connections.labels(tenant_id=tenant_id)._value._value, 'events_sent': sum([ v for k, v in sse_events_sent._value._value.items() if k[0] == tenant_id ]), 'timestamp': time.time() } except Exception as e: logger.error("Error collecting tenant metrics", tenant_id=tenant_id, error=str(e)) return {'tenant_id': tenant_id, 'error': str(e), 'timestamp': time.time()}