Files
bakery-ia/services/alert_processor/app/config.py

60 lines
2.3 KiB
Python
Raw Normal View History

2025-08-23 10:19:58 +02:00
# services/alert_processor/app/config.py
"""
Alert Processor Service Configuration
"""
import os
from typing import List
from shared.config.base import BaseServiceSettings
class AlertProcessorConfig(BaseServiceSettings):
"""Configuration for Alert Processor Service"""
SERVICE_NAME: str = "alert-processor"
APP_NAME: str = "Alert Processor Service"
DESCRIPTION: str = "Central alert and recommendation processor"
2025-09-27 11:18:13 +02:00
# Database configuration (secure approach - build from components)
@property
def DATABASE_URL(self) -> str:
"""Build database URL from secure components"""
# Try complete URL first (for backward compatibility)
complete_url = os.getenv("ALERT_PROCESSOR_DATABASE_URL")
if complete_url:
return complete_url
# Build from components (secure approach)
user = os.getenv("ALERT_PROCESSOR_DB_USER", "alert_processor_user")
password = os.getenv("ALERT_PROCESSOR_DB_PASSWORD", "alert_processor_pass123")
host = os.getenv("ALERT_PROCESSOR_DB_HOST", "localhost")
port = os.getenv("ALERT_PROCESSOR_DB_PORT", "5432")
name = os.getenv("ALERT_PROCESSOR_DB_NAME", "alert_processor_db")
return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{name}"
2025-08-23 10:19:58 +02:00
# Use dedicated Redis DB for alert processing
REDIS_DB: int = int(os.getenv("ALERT_PROCESSOR_REDIS_DB", "6"))
# Alert processing configuration
BATCH_SIZE: int = int(os.getenv("ALERT_BATCH_SIZE", "10"))
PROCESSING_TIMEOUT: int = int(os.getenv("ALERT_PROCESSING_TIMEOUT", "30"))
# Deduplication settings
ALERT_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("ALERT_DEDUPLICATION_WINDOW_MINUTES", "15"))
RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES: int = int(os.getenv("RECOMMENDATION_DEDUPLICATION_WINDOW_MINUTES", "60"))
# Alert severity channel mappings (hardcoded for now to avoid config parsing issues)
@property
def urgent_channels(self) -> List[str]:
return ["whatsapp", "email", "push", "dashboard"]
@property
def high_channels(self) -> List[str]:
return ["whatsapp", "email", "dashboard"]
@property
def medium_channels(self) -> List[str]:
return ["email", "dashboard"]
@property
def low_channels(self) -> List[str]:
return ["dashboard"]