Add new alert architecture

This commit is contained in:
Urtzi Alfaro
2025-08-23 10:19:58 +02:00
parent 1a9839240e
commit 4b4268d640
45 changed files with 6518 additions and 1590 deletions

View File

@@ -1,321 +0,0 @@
## 🎯 **Complete Notification Service Implementation**
### **📁 File Structure Created**
```
services/notification/
├── app/
│ ├── main.py ✅ Complete FastAPI application
│ ├── core/
│ │ ├── config.py ✅ Configuration settings
│ │ └── database.py ✅ Database initialization
│ ├── models/
│ │ ├── notifications.py ✅ Core notification models
│ │ └── templates.py ✅ Template-specific models
│ ├── schemas/
│ │ └── notifications.py ✅ Pydantic schemas
│ ├── services/
│ │ ├── notification_service.py ✅ Main business logic
│ │ ├── email_service.py ✅ Email delivery
│ │ ├── whatsapp_service.py ✅ WhatsApp delivery
│ │ └── messaging.py ✅ RabbitMQ integration
│ └── api/
│ └── notifications.py ✅ Complete API routes
├── requirements.txt ✅ Python dependencies
├── Dockerfile ✅ Container configuration
└── .env.example ✅ Environment variables
```
### **🔧 Key Features Implemented**
#### **1. Complete Business Logic**
-**NotificationService**: Core orchestration of all notification operations
-**Multi-channel support**: Email, WhatsApp, Push (extensible)
-**Template processing**: Jinja2-based template rendering
-**Bulk notifications**: Batch processing with rate limiting
-**User preferences**: Granular notification controls
-**Scheduling**: Delayed notification delivery
#### **2. Email Service Integration**
-**SMTP support**: Configurable email providers (Gmail, SendGrid, etc
-**HTML + Text emails**: Rich email templates with fallbacks
-**Bulk email processing**: Rate-limited batch sending
-**Template system**: Pre-built Spanish templates for bakeries
-**Health checks**: SMTP connection monitoring
-**Attachment support**: File attachment capabilities
#### **3. WhatsApp Service Integration**
-**Twilio integration**: WhatsApp Business API support
-**Spanish phone formatting**: Automatic +34 country code handling
-**Template messages**: WhatsApp Business template support
-**Bulk WhatsApp**: Rate-limited batch messaging
-**Delivery status**: Webhook handling for delivery confirmations
#### **4. Database Models & Schemas**
-**Complete data model**: Notifications, templates, preferences, logs
-**Multi-tenant support**: Tenant-scoped notifications
-**Audit trail**: Detailed delivery attempt logging
-**Template management**: System and custom templates
-**User preferences**: Granular notification controls
#### **5. API Integration with Gateway**
-**Gateway authentication**: Uses shared auth decorators
-**Tenant isolation**: Automatic tenant scoping
-**Role-based access**: Admin/manager/user permissions
-**Complete CRUD**: Full notification management API
-**Webhook endpoints**: External delivery status handling
#### **6. RabbitMQ Event Integration**
-**Event consumers**: Listens for user registration, forecasts, training
-**Event publishers**: Publishes notification status events
-**Auto-notifications**: Triggers welcome emails, alerts, reports
-**Error handling**: Robust message processing with retry logic
#### **7. Spanish Bakery Templates**
-**Welcome email**: Professional onboarding email
-**Forecast alerts**: Demand variation notifications
-**Weekly reports**: Performance summary emails
-**Responsive HTML**: Mobile-optimized email designs
-**Spanish localization**: All content in Spanish
### **🚀 Integration with Your Architecture**
#### **Seamless Gateway Integration**
```python
# Gateway already routes to notification service
app.include_router(notification.router, prefix="/api/v1/notifications", tags=["notifications"])
# Authentication handled by gateway middleware
# Tenant isolation automatic
# User context passed via headers
```
#### **Shared Library Usage**
```python
# Uses your existing shared components
from shared.auth.decorators import get_current_user_dep, get_current_tenant_id_dep
from shared.messaging.rabbitmq import RabbitMQClient
from shared.monitoring.metrics import MetricsCollector
from shared.database.base import DatabaseManager
```
#### **Event-Driven Architecture**
```python
# Automatic notifications triggered by:
# - User registration → Welcome email
# - Forecast alerts → Alert emails + WhatsApp
# - Training completion → Status notifications
# - Data imports → Import confirmations
```
### **📊 Production Features**
#### **Health Monitoring**
-**Database health checks**: Connection monitoring
-**SMTP health checks**: Email service validation
-**WhatsApp health checks**: API connectivity tests
-**Prometheus metrics**: Delivery rates, response times
-**Structured logging**: Comprehensive error tracking
#### **Rate Limiting & Scaling**
-**Email rate limits**: 1000/hour configurable
-**WhatsApp rate limits**: 100/hour (Twilio limits)
-**Batch processing**: Configurable batch sizes
-**Retry logic**: Automatic retry with exponential backoff
-**Queue management**: Background task processing
#### **Security & Compliance**
-**User consent**: Preference-based opt-in/out
-**Tenant isolation**: Multi-tenant data separation
-**GDPR compliance**: User data control
-**Rate limiting**: DoS protection
-**Input validation**: Pydantic schema validation
### **🎯 Business-Specific Features**
#### **Bakery Use Cases**
```python
# Forecast alerts when demand varies >20%
# Daily production recommendations
# Weekly performance reports
# Stock shortage notifications
# Weather impact alerts
# Holiday/event notifications
```
#### **Spanish Localization**
-**Spanish templates**: Native Spanish content
-**Madrid timezone**: Europe/Madrid default
-**Spanish phone format**: +34 prefix handling
-**Local business hours**: Quiet hours support
-**Cultural context**: Bakery-specific terminology
### **🔄 How to Deploy**
#### **1. Add to Docker Compose**
```yaml
# Already integrated in your docker-compose.yml
notification-service:
build: ./services/notification
ports:
- "8006:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://notification_user:notification_pass123@notification-db:5432/notification_db
depends_on:
- notification-db
- redis
- rabbitmq
```
#### **2. Environment Setup**
```bash
# Copy environment template
cp services/notification/.env.example services/notification/.env
# Configure email provider
SMTP_USER=your-email@gmail.com
SMTP_PASSWORD=your-app-password
# Configure WhatsApp (optional)
WHATSAPP_API_KEY=your-twilio-sid:your-twilio-token
```
#### **3. Start Service**
```bash
# Service starts automatically with
docker-compose up -d
# Check health
curl http://localhost:8006/health
# View API docs
open http://localhost:8006/docs
```
### **📈 API Usage Examples**
#### **Send Welcome Email**
```python
POST /api/v1/notifications/send
{
"type": "email",
"recipient_email": "usuario@panaderia.com",
"template_id": "welcome_email",
"template_data": {
"user_name": "Juan Carlos",
"dashboard_url": "https://app.bakeryforecast.es/dashboard"
}
}
```
#### **Send Forecast Alert**
```python
POST /api/v1/notifications/send
{
"type": "email",
"template_id": "forecast_alert_email",
"template_data": {
"bakery_name": "Panadería San Miguel",
"product_name": "Pan integral",
"forecast_date": "2025-01-25",
"predicted_demand": 120,
"variation_percentage": 35,
"alert_message": "Aumento significativo esperado. Se recomienda incrementar producción."
},
"broadcast": true,
"priority": "high"
}
```
#### **Update User Preferences**
```python
PATCH /api/v1/notifications/preferences
{
"email_alerts": true,
"whatsapp_enabled": false,
"quiet_hours_start": "22:00",
"quiet_hours_end": "08:00",
"language": "es"
}
```
### **🎉 Key Benefits**
#### **✅ Production Ready**
- Complete error handling and logging
- Health checks and monitoring
- Rate limiting and security
- Multi-tenant architecture
- Scalable event-driven design
#### **✅ Business Focused**
- Spanish bakery templates
- Madrid timezone/localization
- Forecast-specific notifications
- Professional email designs
- WhatsApp support for urgent alerts
#### **✅ Developer Friendly**
- Comprehensive API documentation
- Type-safe Pydantic schemas
- Async/await throughout
- Structured logging
- Easy testing and debugging
#### **✅ Seamless Integration**
- Uses your shared libraries
- Integrates with gateway auth
- Follows your architectural patterns
- Maintains tenant isolation
- Publishes events to RabbitMQ
### **🚀 Next Steps**
#### **Immediate (Week 2)**
1. **Deploy the service**: Add to your docker-compose and start
2. **Configure SMTP**: Set up email provider credentials
3. **Test integration**: Send test notifications via API
4. **Event integration**: Verify RabbitMQ event handling
#### **Production Optimization**
1. **Email provider**: Consider SendGrid/Mailgun for production
2. **WhatsApp setup**: Configure Twilio Business API
3. **Template customization**: Add tenant-specific templates
4. **Analytics dashboard**: Add notification analytics to frontend
### **💡 Advanced Features Ready for Extension**
-**Push notifications**: Framework ready for mobile push
-**SMS support**: Easy to add SMS providers
-**A/B testing**: Template variant testing
-**Scheduled campaigns**: Marketing email campaigns
-**Analytics integration**: Detailed delivery analytics
**This notification service is now a complete, production-ready microservice that fully integrates with your bakery forecasting platform! It handles all notification needs from welcome emails to urgent forecast alerts, with proper Spanish localization and bakery-specific templates.** 🎯

View File

@@ -0,0 +1,189 @@
# services/notification/app/api/sse_routes.py
"""
SSE routes for real-time alert and recommendation streaming
"""
import asyncio
import json
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Request, Depends, HTTPException, BackgroundTasks
from sse_starlette.sse import EventSourceResponse
import structlog
from shared.auth.decorators import get_current_user
router = APIRouter(prefix="/sse", tags=["sse"])
logger = structlog.get_logger()
@router.get("/alerts/stream/{tenant_id}")
async def stream_alerts(
tenant_id: str,
request: Request,
background_tasks: BackgroundTasks,
current_user = Depends(get_current_user)
):
"""
SSE endpoint for real-time alert and recommendation streaming
Supports both alerts and recommendations through unified stream
"""
# Verify user has access to this tenant
if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id):
raise HTTPException(403, "Access denied to this tenant")
# Get SSE service from app state
sse_service = getattr(request.app.state, 'sse_service', None)
if not sse_service:
raise HTTPException(500, "SSE service not available")
async def event_generator():
"""Generate SSE events for the client"""
client_queue = asyncio.Queue(maxsize=100) # Limit queue size
try:
# Register client
await sse_service.add_client(tenant_id, client_queue)
logger.info("SSE client connected",
tenant_id=tenant_id,
user_id=getattr(current_user, 'id', 'unknown'))
# Stream events
while True:
# Check if client disconnected
if await request.is_disconnected():
logger.info("SSE client disconnected", tenant_id=tenant_id)
break
try:
# Wait for events with timeout for keepalive
event = await asyncio.wait_for(
client_queue.get(),
timeout=30.0
)
yield event
except asyncio.TimeoutError:
# Send keepalive ping
yield {
"event": "ping",
"data": json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"status": "keepalive"
}),
"id": f"ping_{int(datetime.now().timestamp())}"
}
except Exception as e:
logger.error("Error in SSE event generator",
tenant_id=tenant_id,
error=str(e))
break
except Exception as e:
logger.error("SSE connection error",
tenant_id=tenant_id,
error=str(e))
finally:
# Clean up on disconnect
try:
await sse_service.remove_client(tenant_id, client_queue)
logger.info("SSE client cleanup completed", tenant_id=tenant_id)
except Exception as e:
logger.error("Error cleaning up SSE client",
tenant_id=tenant_id,
error=str(e))
return EventSourceResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@router.post("/items/{item_id}/acknowledge")
async def acknowledge_item(
item_id: str,
current_user = Depends(get_current_user)
):
"""Acknowledge an alert or recommendation"""
try:
# This would update the database
# For now, just return success
logger.info("Item acknowledged",
item_id=item_id,
user_id=getattr(current_user, 'id', 'unknown'))
return {
"status": "success",
"item_id": item_id,
"acknowledged_by": getattr(current_user, 'id', 'unknown'),
"acknowledged_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error("Failed to acknowledge item", item_id=item_id, error=str(e))
raise HTTPException(500, "Failed to acknowledge item")
@router.post("/items/{item_id}/resolve")
async def resolve_item(
item_id: str,
current_user = Depends(get_current_user)
):
"""Resolve an alert or recommendation"""
try:
# This would update the database
# For now, just return success
logger.info("Item resolved",
item_id=item_id,
user_id=getattr(current_user, 'id', 'unknown'))
return {
"status": "success",
"item_id": item_id,
"resolved_by": getattr(current_user, 'id', 'unknown'),
"resolved_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error("Failed to resolve item", item_id=item_id, error=str(e))
raise HTTPException(500, "Failed to resolve item")
@router.get("/status/{tenant_id}")
async def get_sse_status(
tenant_id: str,
current_user = Depends(get_current_user)
):
"""Get SSE connection status for a tenant"""
# Verify user has access to this tenant
if not hasattr(current_user, 'has_access_to_tenant') or not current_user.has_access_to_tenant(tenant_id):
raise HTTPException(403, "Access denied to this tenant")
try:
# Get SSE service from app state
sse_service = getattr(request.app.state, 'sse_service', None)
if not sse_service:
return {"status": "unavailable", "message": "SSE service not initialized"}
metrics = sse_service.get_metrics()
tenant_connections = len(sse_service.active_connections.get(tenant_id, set()))
return {
"status": "available",
"tenant_id": tenant_id,
"connections": tenant_connections,
"total_connections": metrics["total_connections"],
"active_tenants": metrics["active_tenants"]
}
except Exception as e:
logger.error("Failed to get SSE status", tenant_id=tenant_id, error=str(e))
raise HTTPException(500, "Failed to get SSE status")

View File

@@ -1,9 +1,9 @@
# ================================================================
# services/notification/app/main.py - COMPLETE IMPLEMENTATION
# services/notification/app/main.py - ENHANCED WITH SSE SUPPORT
# ================================================================
"""
Notification Service Main Application
Handles email and WhatsApp notifications with full integration
Handles email, WhatsApp notifications and SSE for real-time alerts/recommendations
"""
import structlog
@@ -15,7 +15,12 @@ from fastapi.responses import JSONResponse
from app.core.config import settings
from app.core.database import init_db
from app.api.notifications import router as notification_router
from app.api.sse_routes import router as sse_router
from app.services.messaging import setup_messaging, cleanup_messaging
from app.services.sse_service import SSEService
from app.services.notification_orchestrator import NotificationOrchestrator
from app.services.email_service import EmailService
from app.services.whatsapp_service import WhatsAppService
from shared.monitoring import setup_logging, HealthChecker
from shared.monitoring.metrics import setup_metrics_early
@@ -30,8 +35,8 @@ health_checker = None
# Create FastAPI app FIRST
app = FastAPI(
title="Bakery Notification Service",
description="Email and WhatsApp notification service for bakery forecasting platform",
version="1.0.0",
description="Email, WhatsApp and SSE notification service for bakery alerts and recommendations",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
@@ -56,12 +61,36 @@ async def lifespan(app: FastAPI):
await setup_messaging()
logger.info("Messaging initialized")
# Initialize services
email_service = EmailService()
whatsapp_service = WhatsAppService()
# Initialize SSE service
sse_service = SSEService(settings.REDIS_URL)
await sse_service.initialize()
logger.info("SSE service initialized")
# Create orchestrator
orchestrator = NotificationOrchestrator(
email_service=email_service,
whatsapp_service=whatsapp_service,
sse_service=sse_service
)
# Store services in app state
app.state.orchestrator = orchestrator
app.state.sse_service = sse_service
app.state.email_service = email_service
app.state.whatsapp_service = whatsapp_service
# Register custom metrics (metrics_collector already exists)
metrics_collector.register_counter("notifications_sent_total", "Total notifications sent", labels=["type", "status"])
metrics_collector.register_counter("notifications_sent_total", "Total notifications sent", labels=["type", "status", "channel"])
metrics_collector.register_counter("emails_sent_total", "Total emails sent", labels=["status"])
metrics_collector.register_counter("whatsapp_sent_total", "Total WhatsApp messages sent", labels=["status"])
metrics_collector.register_counter("sse_events_sent_total", "Total SSE events sent", labels=["tenant", "event_type"])
metrics_collector.register_histogram("notification_processing_duration_seconds", "Time spent processing notifications")
metrics_collector.register_gauge("notification_queue_size", "Current notification queue size")
metrics_collector.register_gauge("sse_active_connections", "Number of active SSE connections")
# Setup health checker
health_checker = HealthChecker("notification-service")
@@ -93,14 +122,22 @@ async def lifespan(app: FastAPI):
# Add WhatsApp service health check
async def check_whatsapp_service():
try:
from app.services.whatsapp_service import WhatsAppService
whatsapp_service = WhatsAppService()
return await whatsapp_service.health_check()
except Exception as e:
return f"WhatsApp service error: {e}"
health_checker.add_check("whatsapp_service", check_whatsapp_service, timeout=10.0, critical=False)
# Add SSE service health check
async def check_sse_service():
try:
metrics = sse_service.get_metrics()
return "healthy" if metrics["redis_connected"] else "Redis connection failed"
except Exception as e:
return f"SSE service error: {e}"
health_checker.add_check("sse_service", check_sse_service, timeout=5.0, critical=True)
# Add messaging health check
def check_messaging():
try:
@@ -115,7 +152,7 @@ async def lifespan(app: FastAPI):
# Store health checker in app state
app.state.health_checker = health_checker
logger.info("Notification Service started successfully")
logger.info("Notification Service with SSE support started successfully")
except Exception as e:
logger.error(f"Failed to start Notification Service: {e}")
@@ -126,10 +163,15 @@ async def lifespan(app: FastAPI):
# Shutdown
logger.info("Shutting down Notification Service...")
try:
# Shutdown SSE service
if hasattr(app.state, 'sse_service'):
await app.state.sse_service.shutdown()
logger.info("SSE service shutdown completed")
await cleanup_messaging()
logger.info("Messaging cleanup completed")
except Exception as e:
logger.error(f"Error during messaging cleanup: {e}")
logger.error(f"Error during shutdown: {e}")
# Set lifespan AFTER metrics setup
app.router.lifespan_context = lifespan
@@ -145,18 +187,30 @@ app.add_middleware(
# Include routers
app.include_router(notification_router, prefix="/api/v1", tags=["notifications"])
app.include_router(sse_router, prefix="/api/v1", tags=["sse"])
# Health check endpoint
@app.get("/health")
async def health_check():
"""Comprehensive health check endpoint"""
"""Comprehensive health check endpoint including SSE"""
if health_checker:
return await health_checker.check_health()
health_result = await health_checker.check_health()
# Add SSE metrics to health check
if hasattr(app.state, 'sse_service'):
try:
sse_metrics = app.state.sse_service.get_metrics()
health_result['sse_metrics'] = sse_metrics
except Exception as e:
health_result['sse_error'] = str(e)
return health_result
else:
return {
"service": "notification-service",
"status": "healthy",
"version": "1.0.0"
"version": "2.0.0",
"features": ["email", "whatsapp", "sse", "alerts", "recommendations"]
}
# Metrics endpoint

View File

@@ -276,14 +276,26 @@ class EmailService:
# Test SMTP connection
if self.smtp_ssl:
# Use implicit TLS/SSL connection (port 465 typically)
server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port, use_tls=True)
await server.connect()
# No need for starttls() when using implicit TLS
else:
# Use plain connection, optionally upgrade with STARTTLS
server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port)
await server.connect()
if self.smtp_tls:
await server.starttls()
await server.connect()
if self.smtp_tls:
# Try STARTTLS, but handle case where connection is already secure
try:
await server.starttls()
except Exception as starttls_error:
# If STARTTLS fails because connection is already using TLS, that's okay
if "already using TLS" in str(starttls_error) or "already secure" in str(starttls_error):
logger.debug("SMTP connection already secure, skipping STARTTLS")
else:
# Re-raise other STARTTLS errors
raise starttls_error
await server.login(self.smtp_user, self.smtp_password)
await server.quit()

View File

@@ -0,0 +1,279 @@
# services/notification/app/services/notification_orchestrator.py
"""
Notification orchestrator for managing delivery across all channels
Includes SSE integration for real-time dashboard updates
"""
from typing import List, Dict, Any
from datetime import datetime
import structlog
from .email_service import EmailService
from .whatsapp_service import WhatsAppService
from .sse_service import SSEService
logger = structlog.get_logger()
class NotificationOrchestrator:
"""
Orchestrates delivery across all notification channels
Now includes SSE for real-time dashboard updates, with support for recommendations
"""
def __init__(
self,
email_service: EmailService,
whatsapp_service: WhatsAppService,
sse_service: SSEService,
push_service=None # Optional push service
):
self.email_service = email_service
self.whatsapp_service = whatsapp_service
self.sse_service = sse_service
self.push_service = push_service
async def send_notification(
self,
tenant_id: str,
notification: Dict[str, Any],
channels: List[str]
) -> Dict[str, Any]:
"""
Send notification through specified channels
Channels can include: email, whatsapp, push, dashboard (SSE)
"""
results = {}
# Always send to dashboard for visibility (SSE)
if 'dashboard' in channels or notification.get('type') in ['alert', 'recommendation']:
try:
await self.sse_service.send_item_notification(
tenant_id,
notification
)
results['dashboard'] = {'status': 'sent', 'timestamp': datetime.utcnow().isoformat()}
logger.info("Item sent to dashboard via SSE",
tenant_id=tenant_id,
item_type=notification.get('type'),
item_id=notification.get('id'))
except Exception as e:
logger.error("Failed to send to dashboard",
tenant_id=tenant_id,
error=str(e))
results['dashboard'] = {'status': 'failed', 'error': str(e)}
# Send to email channel
if 'email' in channels:
try:
email_result = await self.email_service.send_notification_email(
to_email=notification.get('email'),
subject=notification.get('title'),
template_data={
'title': notification.get('title'),
'message': notification.get('message'),
'severity': notification.get('severity'),
'item_type': notification.get('type'),
'actions': notification.get('actions', []),
'metadata': notification.get('metadata', {}),
'timestamp': datetime.utcnow().isoformat()
},
notification_type=notification.get('type', 'alert')
)
results['email'] = email_result
except Exception as e:
logger.error("Failed to send email",
tenant_id=tenant_id,
error=str(e))
results['email'] = {'status': 'failed', 'error': str(e)}
# Send to WhatsApp channel
if 'whatsapp' in channels:
try:
whatsapp_result = await self.whatsapp_service.send_notification_message(
to_phone=notification.get('phone'),
message=self._format_whatsapp_message(notification),
notification_type=notification.get('type', 'alert')
)
results['whatsapp'] = whatsapp_result
except Exception as e:
logger.error("Failed to send WhatsApp",
tenant_id=tenant_id,
error=str(e))
results['whatsapp'] = {'status': 'failed', 'error': str(e)}
# Send to push notification channel
if 'push' in channels and self.push_service:
try:
push_result = await self.push_service.send_notification(
user_id=notification.get('user_id'),
title=notification.get('title'),
body=notification.get('message'),
data={
'item_type': notification.get('type'),
'severity': notification.get('severity'),
'item_id': notification.get('id'),
'metadata': notification.get('metadata', {})
}
)
results['push'] = push_result
except Exception as e:
logger.error("Failed to send push notification",
tenant_id=tenant_id,
error=str(e))
results['push'] = {'status': 'failed', 'error': str(e)}
# Log summary
successful_channels = [ch for ch, result in results.items() if result.get('status') == 'sent']
failed_channels = [ch for ch, result in results.items() if result.get('status') == 'failed']
logger.info("Notification delivery completed",
tenant_id=tenant_id,
item_type=notification.get('type'),
item_id=notification.get('id'),
successful_channels=successful_channels,
failed_channels=failed_channels,
total_channels=len(channels))
return {
'status': 'completed',
'successful_channels': successful_channels,
'failed_channels': failed_channels,
'results': results,
'timestamp': datetime.utcnow().isoformat()
}
def _format_whatsapp_message(self, notification: Dict[str, Any]) -> str:
"""Format message for WhatsApp with emojis and structure"""
item_type = notification.get('type', 'alert')
severity = notification.get('severity', 'medium')
# Get appropriate emoji
type_emoji = '🚨' if item_type == 'alert' else '💡'
severity_emoji = {
'urgent': '🔴',
'high': '🟡',
'medium': '🔵',
'low': '🟢'
}.get(severity, '🔵')
message = f"{type_emoji} {severity_emoji} *{notification.get('title', 'Notificación')}*\n\n"
message += f"{notification.get('message', '')}\n"
# Add actions if available
actions = notification.get('actions', [])
if actions and len(actions) > 0:
message += "\n*Acciones sugeridas:*\n"
for i, action in enumerate(actions[:3], 1): # Limit to 3 actions for WhatsApp
message += f"{i}. {action}\n"
# Add timestamp
message += f"\n_Enviado: {datetime.now().strftime('%H:%M, %d/%m/%Y')}_"
return message
def get_channels_by_severity(self, severity: str, item_type: str, hour: int = None) -> List[str]:
"""
Determine notification channels based on severity and item_type
Now includes 'dashboard' as a channel
"""
if hour is None:
hour = datetime.now().hour
# Dashboard always gets all items
channels = ['dashboard']
if item_type == 'alert':
if severity == 'urgent':
# Urgent alerts: All channels immediately
channels.extend(['email', 'whatsapp', 'push'])
elif severity == 'high':
# High alerts: Email and WhatsApp during extended hours
if 6 <= hour <= 22:
channels.extend(['email', 'whatsapp'])
else:
channels.append('email') # Email only during night
elif severity == 'medium':
# Medium alerts: Email during business hours
if 7 <= hour <= 20:
channels.append('email')
elif item_type == 'recommendation':
# Recommendations: Generally less urgent, respect business hours
if severity in ['medium', 'high']:
if 8 <= hour <= 19: # Stricter business hours for recommendations
channels.append('email')
# Low/urgent: Dashboard only (urgent rare for recommendations)
return channels
async def health_check(self) -> Dict[str, Any]:
"""Check health of all notification channels"""
health_status = {
'status': 'healthy',
'channels': {},
'timestamp': datetime.utcnow().isoformat()
}
# Check email service
try:
email_health = await self.email_service.health_check()
health_status['channels']['email'] = email_health
except Exception as e:
health_status['channels']['email'] = {'status': 'unhealthy', 'error': str(e)}
# Check WhatsApp service
try:
whatsapp_health = await self.whatsapp_service.health_check()
health_status['channels']['whatsapp'] = whatsapp_health
except Exception as e:
health_status['channels']['whatsapp'] = {'status': 'unhealthy', 'error': str(e)}
# Check SSE service
try:
sse_metrics = self.sse_service.get_metrics()
sse_status = 'healthy' if sse_metrics['redis_connected'] else 'unhealthy'
health_status['channels']['sse'] = {
'status': sse_status,
'metrics': sse_metrics
}
except Exception as e:
health_status['channels']['sse'] = {'status': 'unhealthy', 'error': str(e)}
# Check push service if available
if self.push_service:
try:
push_health = await self.push_service.health_check()
health_status['channels']['push'] = push_health
except Exception as e:
health_status['channels']['push'] = {'status': 'unhealthy', 'error': str(e)}
# Determine overall status
unhealthy_channels = [
ch for ch, status in health_status['channels'].items()
if status.get('status') != 'healthy'
]
if unhealthy_channels:
health_status['status'] = 'degraded' if len(unhealthy_channels) < len(health_status['channels']) else 'unhealthy'
health_status['unhealthy_channels'] = unhealthy_channels
return health_status
def get_metrics(self) -> Dict[str, Any]:
"""Get aggregated metrics from all services"""
metrics = {
'timestamp': datetime.utcnow().isoformat(),
'channels': {}
}
# Get SSE metrics
try:
metrics['channels']['sse'] = self.sse_service.get_metrics()
except Exception as e:
logger.error("Failed to get SSE metrics", error=str(e))
# Additional metrics could be added here for other services
return metrics

View File

@@ -0,0 +1,256 @@
# services/notification/app/services/sse_service.py
"""
Server-Sent Events service for real-time notifications
Integrated within the notification service for alerts and recommendations
"""
import asyncio
from redis.asyncio import Redis
import json
from typing import Dict, Set, Any
from datetime import datetime
import structlog
logger = structlog.get_logger()
class SSEService:
"""
Server-Sent Events service for real-time notifications
Handles both alerts and recommendations through unified SSE streams
"""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
self.active_connections: Dict[str, Set[asyncio.Queue]] = {}
self.pubsub_tasks: Dict[str, asyncio.Task] = {}
async def initialize(self):
"""Initialize Redis connection"""
try:
self.redis = Redis.from_url(self.redis_url)
logger.info("SSE Service initialized with Redis connection")
except Exception as e:
logger.error("Failed to initialize SSE service", error=str(e))
raise
async def shutdown(self):
"""Clean shutdown"""
try:
# Cancel all pubsub tasks
for task in self.pubsub_tasks.values():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Close all client connections
for tenant_id, connections in self.active_connections.items():
for queue in connections.copy():
try:
await queue.put({"event": "shutdown", "data": json.dumps({"status": "server_shutdown"})})
except:
pass
# Close Redis connection
if self.redis:
await self.redis.close()
logger.info("SSE Service shutdown completed")
except Exception as e:
logger.error("Error during SSE shutdown", error=str(e))
async def add_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Add a new SSE client connection"""
try:
if tenant_id not in self.active_connections:
self.active_connections[tenant_id] = set()
# Start pubsub listener for this tenant if not exists
if tenant_id not in self.pubsub_tasks:
task = asyncio.create_task(self._listen_to_tenant_channel(tenant_id))
self.pubsub_tasks[tenant_id] = task
self.active_connections[tenant_id].add(client_queue)
client_count = len(self.active_connections[tenant_id])
logger.info("SSE client added",
tenant_id=tenant_id,
total_clients=client_count)
# Send connection confirmation
await client_queue.put({
"event": "connected",
"data": json.dumps({
"status": "connected",
"tenant_id": tenant_id,
"timestamp": datetime.utcnow().isoformat(),
"client_count": client_count
})
})
# Send any active items (alerts and recommendations)
active_items = await self.get_active_items(tenant_id)
if active_items:
await client_queue.put({
"event": "initial_items",
"data": json.dumps(active_items)
})
except Exception as e:
logger.error("Error adding SSE client", tenant_id=tenant_id, error=str(e))
async def remove_client(self, tenant_id: str, client_queue: asyncio.Queue):
"""Remove SSE client connection"""
try:
if tenant_id in self.active_connections:
self.active_connections[tenant_id].discard(client_queue)
# If no more clients for this tenant, stop the pubsub listener
if not self.active_connections[tenant_id]:
del self.active_connections[tenant_id]
if tenant_id in self.pubsub_tasks:
task = self.pubsub_tasks[tenant_id]
if not task.done():
task.cancel()
del self.pubsub_tasks[tenant_id]
logger.info("SSE client removed", tenant_id=tenant_id)
except Exception as e:
logger.error("Error removing SSE client", tenant_id=tenant_id, error=str(e))
async def _listen_to_tenant_channel(self, tenant_id: str):
"""Listen to Redis channel for tenant-specific items"""
try:
# Create a separate Redis connection for pubsub
pubsub_redis = Redis.from_url(self.redis_url)
pubsub = pubsub_redis.pubsub()
channel = f"alerts:{tenant_id}"
await pubsub.subscribe(channel)
logger.info("Started listening to tenant channel",
tenant_id=tenant_id,
channel=channel)
async for message in pubsub.listen():
if message["type"] == "message":
# Broadcast to all connected clients for this tenant
await self.broadcast_to_tenant(tenant_id, message["data"])
except asyncio.CancelledError:
logger.info("Stopped listening to tenant channel", tenant_id=tenant_id)
except Exception as e:
logger.error("Error in pubsub listener", tenant_id=tenant_id, error=str(e))
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub_redis.close()
except:
pass
async def broadcast_to_tenant(self, tenant_id: str, message: str):
"""Broadcast message to all connected clients of a tenant"""
if tenant_id not in self.active_connections:
return
try:
item_data = json.loads(message)
event = {
"event": item_data.get('item_type', 'item'), # 'alert' or 'recommendation'
"data": json.dumps(item_data),
"id": item_data.get("id")
}
# Send to all connected clients
disconnected = []
for client_queue in self.active_connections[tenant_id]:
try:
# Use put_nowait to avoid blocking
client_queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning("Client queue full, dropping message", tenant_id=tenant_id)
disconnected.append(client_queue)
except Exception as e:
logger.warning("Failed to send to client", tenant_id=tenant_id, error=str(e))
disconnected.append(client_queue)
# Clean up disconnected clients
for queue in disconnected:
await self.remove_client(tenant_id, queue)
if disconnected:
logger.info("Cleaned up disconnected clients",
tenant_id=tenant_id,
count=len(disconnected))
except Exception as e:
logger.error("Error broadcasting to tenant", tenant_id=tenant_id, error=str(e))
async def send_item_notification(self, tenant_id: str, item: Dict[str, Any]):
"""
Send alert or recommendation via SSE (called by notification orchestrator)
"""
try:
# Publish to Redis for SSE streaming
channel = f"alerts:{tenant_id}"
item_message = {
'id': item.get('id'),
'item_type': item.get('type'), # 'alert' or 'recommendation'
'type': item.get('alert_type', item.get('type')),
'severity': item.get('severity'),
'title': item.get('title'),
'message': item.get('message'),
'actions': item.get('actions', []),
'metadata': item.get('metadata', {}),
'timestamp': item.get('timestamp', datetime.utcnow().isoformat()),
'status': 'active'
}
await self.redis.publish(channel, json.dumps(item_message))
logger.info("Item published to SSE",
tenant_id=tenant_id,
item_type=item.get('type'),
item_id=item.get('id'))
except Exception as e:
logger.error("Error sending item notification via SSE",
tenant_id=tenant_id,
error=str(e))
async def get_active_items(self, tenant_id: str) -> list:
"""Fetch active alerts and recommendations from database"""
try:
# This would integrate with the actual database
# For now, return empty list as placeholder
# In real implementation, this would query the alerts table
# Example query:
# query = """
# SELECT id, item_type, alert_type, severity, title, message,
# actions, metadata, created_at, status
# FROM alerts
# WHERE tenant_id = $1
# AND status = 'active'
# ORDER BY severity_weight DESC, created_at DESC
# LIMIT 50
# """
return [] # Placeholder
except Exception as e:
logger.error("Error fetching active items", tenant_id=tenant_id, error=str(e))
return []
def get_metrics(self) -> Dict[str, Any]:
"""Get SSE service metrics"""
return {
"active_tenants": len(self.active_connections),
"total_connections": sum(len(connections) for connections in self.active_connections.values()),
"active_listeners": len(self.pubsub_tasks),
"redis_connected": self.redis and not self.redis.closed
}

View File

@@ -30,6 +30,17 @@ class WhatsAppService:
self.from_number = settings.WHATSAPP_FROM_NUMBER
self.enabled = settings.ENABLE_WHATSAPP_NOTIFICATIONS
def _parse_api_credentials(self):
"""Parse API key into username and password for Twilio basic auth"""
if not self.api_key or ":" not in self.api_key:
raise ValueError("WhatsApp API key must be in format 'username:password'")
api_parts = self.api_key.split(":", 1)
if len(api_parts) != 2:
raise ValueError("Invalid WhatsApp API key format")
return api_parts[0], api_parts[1]
async def send_message(
self,
to_phone: str,
@@ -181,10 +192,22 @@ class WhatsAppService:
return False
# Test API connectivity with a simple request
# Parse API key (expected format: username:password for Twilio basic auth)
if ":" not in self.api_key:
logger.error("WhatsApp API key must be in format 'username:password'")
return False
api_parts = self.api_key.split(":", 1) # Split on first : only
if len(api_parts) != 2:
logger.error("Invalid WhatsApp API key format")
return False
username, password = api_parts
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/v1/Account", # Twilio account info endpoint
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
auth=(username, password)
)
if response.status_code == 200:
@@ -206,6 +229,13 @@ class WhatsAppService:
async def _send_text_message(self, to_phone: str, message: str) -> bool:
"""Send regular text message via Twilio"""
try:
# Parse API credentials
try:
username, password = self._parse_api_credentials()
except ValueError as e:
logger.error(f"WhatsApp API key configuration error: {e}")
return False
# Prepare request data
data = {
"From": f"whatsapp:{self.from_number}",
@@ -216,9 +246,9 @@ class WhatsAppService:
# Send via Twilio API
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages.json",
f"{self.base_url}/2010-04-01/Accounts/{username}/Messages.json",
data=data,
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
auth=(username, password)
)
if response.status_code == 201:
@@ -245,6 +275,13 @@ class WhatsAppService:
) -> bool:
"""Send WhatsApp template message via Twilio"""
try:
# Parse API credentials
try:
username, password = self._parse_api_credentials()
except ValueError as e:
logger.error(f"WhatsApp API key configuration error: {e}")
return False
# Prepare template data
content_variables = {str(i+1): param for i, param in enumerate(parameters)}
@@ -258,9 +295,9 @@ class WhatsAppService:
# Send via Twilio API
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages.json",
f"{self.base_url}/2010-04-01/Accounts/{username}/Messages.json",
data=data,
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
auth=(username, password)
)
if response.status_code == 201:
@@ -315,10 +352,17 @@ class WhatsAppService:
async def _get_message_status(self, message_sid: str) -> Optional[str]:
"""Get message delivery status from Twilio"""
try:
# Parse API credentials
try:
username, password = self._parse_api_credentials()
except ValueError as e:
logger.error(f"WhatsApp API key configuration error: {e}")
return None
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages/{message_sid}.json",
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
f"{self.base_url}/2010-04-01/Accounts/{username}/Messages/{message_sid}.json",
auth=(username, password)
)
if response.status_code == 200:

View File

@@ -3,6 +3,7 @@ fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
sse-starlette==1.6.5
# Database
sqlalchemy==2.0.23
@@ -22,8 +23,9 @@ aiofiles==23.2.1
aiosmtplib==3.0.1
email-validator==2.1.0
# Messaging
# Messaging & Redis
aio-pika==9.3.1
redis==5.0.1
# Template Engine
jinja2==3.1.2