# services/alert_processor/app/config.py """ Alert Processor Service Configuration """ import os from typing import List from shared.config.base import BaseServiceSettings class AlertProcessorConfig(BaseServiceSettings): """Configuration for Alert Processor Service""" SERVICE_NAME: str = "alert-processor" APP_NAME: str = "Alert Processor Service" DESCRIPTION: str = "Central alert and recommendation processor" # Use dedicated database for alert storage DATABASE_URL: str = os.getenv( "ALERT_PROCESSOR_DATABASE_URL", "postgresql+asyncpg://alert_processor_user:alert_processor_pass123@alert-processor-db:5432/alert_processor_db" ) # Use dedicated Redis DB for alert processing REDIS_DB: int = int(os.getenv("ALERT_PROCESSOR_REDIS_DB", "6")) # Alert processing configuration BATCH_SIZE: int = int(os.getenv("ALERT_BATCH_SIZE", "10")) PROCESSING_TIMEOUT: int = int(os.getenv("ALERT_PROCESSING_TIMEOUT", "30")) # Deduplication settings ALERT_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("ALERT_DEDUPLICATION_WINDOW_MINUTES", "15")) RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES", "60")) # Alert severity channel mappings (hardcoded for now to avoid config parsing issues) @property def urgent_channels(self) -> List[str]: return ["whatsapp", "email", "push", "dashboard"] @property def high_channels(self) -> List[str]: return ["whatsapp", "email", "dashboard"] @property def medium_channels(self) -> List[str]: return ["email", "dashboard"] @property def low_channels(self) -> List[str]: return ["dashboard"]