# shared/alerts/base_service.py """ Base alert service pattern for all microservices Supports both alerts and recommendations through unified detection patterns """ import asyncio import json import uuid from typing import List, Dict, Any, Optional from uuid import UUID from datetime import datetime, timedelta import structlog from redis.asyncio import Redis from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from shared.messaging.rabbitmq import RabbitMQClient from shared.database.base import DatabaseManager from shared.config.rabbitmq_config import get_routing_key logger = structlog.get_logger() class BaseAlertService: """ Base class for service-specific alert and recommendation detection Implements hybrid detection patterns: scheduled jobs, event-driven, and database triggers """ def __init__(self, config): self.config = config self.db_manager = DatabaseManager(config.DATABASE_URL) self.rabbitmq_client = RabbitMQClient(config.RABBITMQ_URL, config.SERVICE_NAME) self.redis = None self.scheduler = AsyncIOScheduler() self.is_leader = False self.exchange = "alerts.exchange" # Metrics self._items_published = 0 self._checks_performed = 0 self._errors_count = 0 async def start(self): """Initialize all detection mechanisms""" try: # Connect to Redis for leader election and deduplication self.redis = await Redis.from_url(self.config.REDIS_URL) logger.info("Connected to Redis", service=self.config.SERVICE_NAME) # Connect to RabbitMQ await self.rabbitmq_client.connect() logger.info("Connected to RabbitMQ", service=self.config.SERVICE_NAME) # Start leader election for scheduled jobs asyncio.create_task(self.maintain_leadership()) # Setup scheduled checks (runs only on leader) self.setup_scheduled_checks() # Start database listener (runs on all instances) await self.start_database_listener() # Start event listener (runs on all instances) await self.start_event_listener() logger.info("Alert service started", service=self.config.SERVICE_NAME) except Exception as e: logger.error("Failed to start alert service", service=self.config.SERVICE_NAME, error=str(e)) raise async def stop(self): """Clean shutdown""" try: # Stop scheduler if self.scheduler.running: self.scheduler.shutdown() # Close connections if self.redis: await self.redis.aclose() # Use aclose() for modern Redis client await self.rabbitmq_client.disconnect() logger.info("Alert service stopped", service=self.config.SERVICE_NAME) except Exception as e: logger.error("Error stopping alert service", service=self.config.SERVICE_NAME, error=str(e)) # PATTERN 1: Scheduled Background Jobs def setup_scheduled_checks(self): """Configure scheduled alert checks - Override in service""" raise NotImplementedError("Subclasses must implement setup_scheduled_checks") async def maintain_leadership(self): """Leader election for scheduled jobs""" lock_key = f"scheduler_lock:{self.config.SERVICE_NAME}" lock_ttl = 60 while True: try: instance_id = getattr(self.config, 'INSTANCE_ID', 'default') was_leader = self.is_leader # Try to acquire new leadership if not currently leader if not self.is_leader: result = await self.redis.set( lock_key, instance_id, ex=lock_ttl, nx=True ) self.is_leader = result is not None else: # Already leader - try to extend the lock current_value = await self.redis.get(lock_key) if current_value and current_value.decode() == instance_id: # Still our lock, extend it await self.redis.expire(lock_key, lock_ttl) self.is_leader = True else: # Lock expired or taken by someone else self.is_leader = False # Handle leadership changes if self.is_leader and not was_leader: self.scheduler.start() logger.info("Acquired scheduler leadership", service=self.config.SERVICE_NAME) elif not self.is_leader and was_leader: self.scheduler.shutdown() logger.info("Lost scheduler leadership", service=self.config.SERVICE_NAME) await asyncio.sleep(lock_ttl // 2) except Exception as e: logger.error("Leadership error", service=self.config.SERVICE_NAME, error=str(e)) self.is_leader = False await asyncio.sleep(5) # PATTERN 2: Event-Driven Detection async def start_event_listener(self): """Listen for business events - Override in service""" pass # PATTERN 3: Database Triggers async def start_database_listener(self): """Listen for database notifications""" try: import asyncpg # Convert SQLAlchemy URL format to plain PostgreSQL for asyncpg database_url = self.config.DATABASE_URL if database_url.startswith('postgresql+asyncpg://'): database_url = database_url.replace('postgresql+asyncpg://', 'postgresql://') conn = await asyncpg.connect(database_url) # Register listeners based on service await self.register_db_listeners(conn) logger.info("Database listeners registered", service=self.config.SERVICE_NAME) except Exception as e: logger.error("Failed to setup database listeners", service=self.config.SERVICE_NAME, error=str(e)) async def register_db_listeners(self, conn): """Register database listeners - Override in service""" pass # Publishing (Updated for type) async def publish_item(self, tenant_id: UUID, item: Dict[str, Any], item_type: str = 'alert'): """Publish alert or recommendation to RabbitMQ with deduplication""" try: # Check for duplicate item_key = f"{tenant_id}:{item_type}:{item['type']}:{item.get('metadata', {}).get('id', '')}" if await self.is_duplicate_item(item_key): logger.debug("Duplicate item skipped", service=self.config.SERVICE_NAME, item_type=item_type, alert_type=item['type']) return False # Add metadata item['id'] = str(uuid.uuid4()) item['tenant_id'] = str(tenant_id) item['service'] = self.config.SERVICE_NAME item['timestamp'] = datetime.utcnow().isoformat() item['item_type'] = item_type # 'alert' or 'recommendation' # Determine routing key based on severity and type routing_key = get_routing_key(item_type, item['severity'], self.config.SERVICE_NAME) # Publish to RabbitMQ success = await self.rabbitmq_client.publish_event( exchange_name=self.exchange, routing_key=routing_key, event_data=item ) if success: self._items_published += 1 logger.info("Item published successfully", service=self.config.SERVICE_NAME, item_type=item_type, alert_type=item['type'], severity=item['severity'], routing_key=routing_key) else: self._errors_count += 1 logger.error("Failed to publish item", service=self.config.SERVICE_NAME, item_type=item_type, alert_type=item['type']) return success except Exception as e: self._errors_count += 1 logger.error("Error publishing item", service=self.config.SERVICE_NAME, error=str(e), item_type=item_type) return False async def is_duplicate_item(self, item_key: str, window_minutes: int = 15) -> bool: """Prevent duplicate items within time window""" key = f"item_sent:{item_key}" try: result = await self.redis.set( key, "1", ex=window_minutes * 60, nx=True ) return result is None # None means duplicate except Exception as e: logger.error("Error checking duplicate", error=str(e)) return False # Allow publishing if check fails # Helper methods async def get_active_tenants(self) -> List[UUID]: """Get list of active tenant IDs""" try: from sqlalchemy import text query = text("SELECT DISTINCT tenant_id FROM tenants WHERE status = 'active'") async with self.db_manager.get_session() as session: result = await session.execute(query) return [row.tenant_id for row in result.fetchall()] except Exception as e: # If tenants table doesn't exist, skip tenant-based processing if "does not exist" in str(e): logger.debug("Tenants table not found, skipping tenant-based alert processing") return [] else: logger.error("Error fetching active tenants", error=str(e)) return [] async def get_tenant_config(self, tenant_id: UUID) -> Dict[str, Any]: """Get tenant-specific configuration""" try: from sqlalchemy import text query = text("SELECT config FROM tenants WHERE tenant_id = :tenant_id") async with self.db_manager.get_session() as session: result = await session.execute(query, {"tenant_id": tenant_id}) row = result.fetchone() return json.loads(row.config) if row and row.config else {} except Exception as e: logger.error("Error fetching tenant config", tenant_id=str(tenant_id), error=str(e)) return {} # Health and metrics def get_metrics(self) -> Dict[str, Any]: """Get service metrics""" return { "items_published": self._items_published, "checks_performed": self._checks_performed, "errors_count": self._errors_count, "is_leader": self.is_leader, "scheduler_running": self.scheduler.running, "redis_connected": self.redis and not self.redis.closed, "rabbitmq_connected": self.rabbitmq_client.connected if self.rabbitmq_client else False } async def health_check(self) -> Dict[str, Any]: """Comprehensive health check""" try: # Check Redis redis_healthy = False if self.redis and not self.redis.closed: await self.redis.ping() redis_healthy = True # Check RabbitMQ rabbitmq_healthy = self.rabbitmq_client.connected if self.rabbitmq_client else False # Check database db_healthy = False try: from sqlalchemy import text async with self.db_manager.get_session() as session: await session.execute(text("SELECT 1")) db_healthy = True except: pass status = "healthy" if all([redis_healthy, rabbitmq_healthy, db_healthy]) else "unhealthy" return { "status": status, "service": self.config.SERVICE_NAME, "components": { "redis": "healthy" if redis_healthy else "unhealthy", "rabbitmq": "healthy" if rabbitmq_healthy else "unhealthy", "database": "healthy" if db_healthy else "unhealthy", "scheduler": "running" if self.scheduler.running else "stopped" }, "metrics": self.get_metrics() } except Exception as e: return { "status": "error", "service": self.config.SERVICE_NAME, "error": str(e) } class AlertServiceMixin: """Mixin providing common alert helper methods""" def format_spanish_message(self, template_key: str, **kwargs) -> Dict[str, Any]: """Format Spanish alert message""" from shared.alerts.templates import format_item_message return format_item_message(template_key, 'es', **kwargs) def get_business_hours_severity(self, base_severity: str) -> str: """Adjust severity based on business hours""" current_hour = datetime.now().hour # Reduce non-critical severity outside business hours (7-20) if not (7 <= current_hour <= 20): if base_severity == 'medium': return 'low' elif base_severity == 'high' and current_hour < 6 or current_hour > 22: return 'medium' return base_severity def should_send_recommendation(self, tenant_id: UUID, rec_type: str) -> bool: """Check if recommendation should be sent based on tenant preferences""" # Implement tenant-specific recommendation frequency limits # This is a simplified version return True