# ================================================================ # services/notification/app/main.py - ENHANCED WITH SSE SUPPORT # ================================================================ """ Notification Service Main Application Handles email, WhatsApp notifications and SSE for real-time alerts/recommendations """ from fastapi import FastAPI from sqlalchemy import text from app.core.config import settings from app.core.database import database_manager from app.api.notifications import router as notification_router from app.api.sse_routes import router as sse_router from app.services.messaging import setup_messaging, cleanup_messaging from app.services.sse_service import SSEService from app.services.notification_orchestrator import NotificationOrchestrator from app.services.email_service import EmailService from app.services.whatsapp_service import WhatsAppService from shared.service_base import StandardFastAPIService class NotificationService(StandardFastAPIService): """Notification Service with standardized setup""" expected_migration_version = "00001" async def on_startup(self, app): """Custom startup logic including migration verification""" await self.verify_migrations() await super().on_startup(app) async def verify_migrations(self): """Verify database schema matches the latest migrations.""" try: async with self.database_manager.get_session() as session: result = await session.execute(text("SELECT version_num FROM alembic_version")) version = result.scalar() if version != self.expected_migration_version: self.logger.error(f"Migration version mismatch: expected {self.expected_migration_version}, got {version}") raise RuntimeError(f"Migration version mismatch: expected {self.expected_migration_version}, got {version}") self.logger.info(f"Migration verification successful: {version}") except Exception as e: self.logger.error(f"Migration verification failed: {e}") raise def __init__(self): # Define expected database tables for health checks notification_expected_tables = [ 'notifications', 'notification_templates', 'notification_preferences', 'notification_logs', 'email_templates', 'whatsapp_templates' ] self.sse_service = None self.orchestrator = None self.email_service = None self.whatsapp_service = None # Define custom metrics for notification service notification_custom_metrics = { "notifications_sent_total": { "type": "counter", "description": "Total notifications sent", "labels": ["type", "status", "channel"] }, "emails_sent_total": { "type": "counter", "description": "Total emails sent", "labels": ["status"] }, "whatsapp_sent_total": { "type": "counter", "description": "Total WhatsApp messages sent", "labels": ["status"] }, "sse_events_sent_total": { "type": "counter", "description": "Total SSE events sent", "labels": ["tenant", "event_type"] }, "notification_processing_duration_seconds": { "type": "histogram", "description": "Time spent processing notifications" } } # Define custom health checks for notification service components async def check_email_service(): """Check email service health""" try: return await self.email_service.health_check() if self.email_service else False except Exception as e: self.logger.error("Email service health check failed", error=str(e)) return False async def check_whatsapp_service(): """Check WhatsApp service health""" try: return await self.whatsapp_service.health_check() if self.whatsapp_service else False except Exception as e: self.logger.error("WhatsApp service health check failed", error=str(e)) return False async def check_sse_service(): """Check SSE service health""" try: if self.sse_service: metrics = self.sse_service.get_metrics() return bool(metrics.get("redis_connected", False)) return False except Exception as e: self.logger.error("SSE service health check failed", error=str(e)) return False async def check_messaging(): """Check messaging service health""" try: from app.services.messaging import notification_publisher return bool(notification_publisher and notification_publisher.connected) except Exception as e: self.logger.error("Messaging health check failed", error=str(e)) return False super().__init__( service_name="notification-service", app_name="Bakery Notification Service", description="Email, WhatsApp and SSE notification service for bakery alerts and recommendations", version="2.0.0", log_level=settings.LOG_LEVEL, cors_origins=getattr(settings, 'CORS_ORIGINS', ["*"]), api_prefix="/api/v1", database_manager=database_manager, expected_tables=notification_expected_tables, custom_health_checks={ "email_service": check_email_service, "whatsapp_service": check_whatsapp_service, "sse_service": check_sse_service, "messaging": check_messaging }, enable_messaging=True, custom_metrics=notification_custom_metrics ) async def _setup_messaging(self): """Setup messaging for notification service""" await setup_messaging() self.logger.info("Messaging initialized") async def _cleanup_messaging(self): """Cleanup messaging for notification service""" await cleanup_messaging() async def on_startup(self, app: FastAPI): """Custom startup logic for notification service""" # Initialize services self.email_service = EmailService() self.whatsapp_service = WhatsAppService() # Initialize SSE service self.sse_service = SSEService(settings.REDIS_URL) await self.sse_service.initialize() self.logger.info("SSE service initialized") # Create orchestrator self.orchestrator = NotificationOrchestrator( email_service=self.email_service, whatsapp_service=self.whatsapp_service, sse_service=self.sse_service ) # Store services in app state app.state.orchestrator = self.orchestrator app.state.sse_service = self.sse_service app.state.email_service = self.email_service app.state.whatsapp_service = self.whatsapp_service async def on_shutdown(self, app: FastAPI): """Custom shutdown logic for notification service""" # Shutdown SSE service if self.sse_service: await self.sse_service.shutdown() self.logger.info("SSE service shutdown completed") def get_service_features(self): """Return notification-specific features""" return [ "email_notifications", "whatsapp_notifications", "sse_real_time_updates", "notification_templates", "notification_orchestration", "messaging_integration", "multi_channel_support" ] def setup_custom_endpoints(self): """Setup custom endpoints for notification service""" # SSE metrics endpoint @self.app.get("/sse-metrics") async def sse_metrics(): """Get SSE service metrics""" if self.sse_service: try: sse_metrics = self.sse_service.get_metrics() return { 'active_tenants': sse_metrics.get('active_tenants', 0), 'total_connections': sse_metrics.get('total_connections', 0), 'active_listeners': sse_metrics.get('active_listeners', 0), 'redis_connected': bool(sse_metrics.get('redis_connected', False)) } except Exception as e: return {"error": str(e)} return {"error": "SSE service not available"} # Metrics endpoint @self.app.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" if self.metrics_collector: return self.metrics_collector.get_metrics() return {"metrics": "not_available"} # Create service instance service = NotificationService() # Create FastAPI app with standardized setup app = service.create_app( docs_url="/docs", redoc_url="/redoc" ) # Setup standard endpoints service.setup_standard_endpoints() # Setup custom endpoints service.setup_custom_endpoints() # Include routers service.add_router(notification_router, tags=["notifications"]) service.add_router(sse_router, tags=["sse"]) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)