# services/alert_processor/app/config.py """ Alert Processor Service Configuration """ import os from typing import List from shared.config.base import BaseServiceSettings class AlertProcessorConfig(BaseServiceSettings): """Configuration for Alert Processor Service""" SERVICE_NAME: str = "alert-processor" APP_NAME: str = "Alert Processor Service" DESCRIPTION: str = "Central alert and recommendation processor" # Database configuration (secure approach - build from components) @property def DATABASE_URL(self) -> str: """Build database URL from secure components""" # Try complete URL first (for backward compatibility) complete_url = os.getenv("ALERT_PROCESSOR_DATABASE_URL") if complete_url: return complete_url # Build from components (secure approach) user = os.getenv("ALERT_PROCESSOR_DB_USER", "alert_processor_user") password = os.getenv("ALERT_PROCESSOR_DB_PASSWORD", "alert_processor_pass123") host = os.getenv("ALERT_PROCESSOR_DB_HOST", "localhost") port = os.getenv("ALERT_PROCESSOR_DB_PORT", "5432") name = os.getenv("ALERT_PROCESSOR_DB_NAME", "alert_processor_db") return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{name}" # Use dedicated Redis DB for alert processing REDIS_DB: int = int(os.getenv("ALERT_PROCESSOR_REDIS_DB", "6")) # Alert processing configuration BATCH_SIZE: int = int(os.getenv("ALERT_BATCH_SIZE", "10")) PROCESSING_TIMEOUT: int = int(os.getenv("ALERT_PROCESSING_TIMEOUT", "30")) # Deduplication settings ALERT_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("ALERT_DEDUPLICATION_WINDOW_MINUTES", "15")) RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES", "60")) # Alert severity channel mappings (hardcoded for now to avoid config parsing issues) @property def urgent_channels(self) -> List[str]: return ["whatsapp", "email", "push", "dashboard"] @property def high_channels(self) -> List[str]: return ["whatsapp", "email", "dashboard"] @property def medium_channels(self) -> List[str]: return ["email", "dashboard"] @property def low_channels(self) -> List[str]: return ["dashboard"]