Add notification service

This commit is contained in:
Urtzi Alfaro
2025-07-21 22:44:11 +02:00
parent d029630c8e
commit d9affc950a
11 changed files with 3680 additions and 57 deletions

View File

@@ -0,0 +1,547 @@
# ================================================================
# services/notification/app/services/email_service.py
# ================================================================
"""
Email service for sending notifications
Handles SMTP configuration and email delivery
"""
import structlog
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formataddr
from typing import Optional, List, Dict, Any
import aiosmtplib
from jinja2 import Template
import asyncio
from app.core.config import settings
from shared.monitoring.metrics import MetricsCollector
logger = structlog.get_logger()
metrics = MetricsCollector("notification-service")
class EmailService:
"""
Email service for sending notifications via SMTP
Supports both plain text and HTML emails
"""
def __init__(self):
self.smtp_host = settings.SMTP_HOST
self.smtp_port = settings.SMTP_PORT
self.smtp_user = settings.SMTP_USER
self.smtp_password = settings.SMTP_PASSWORD
self.smtp_tls = settings.SMTP_TLS
self.smtp_ssl = settings.SMTP_SSL
self.default_from_email = settings.DEFAULT_FROM_EMAIL
self.default_from_name = settings.DEFAULT_FROM_NAME
async def send_email(
self,
to_email: str,
subject: str,
text_content: str,
html_content: Optional[str] = None,
from_email: Optional[str] = None,
from_name: Optional[str] = None,
reply_to: Optional[str] = None,
attachments: Optional[List[Dict[str, Any]]] = None
) -> bool:
"""
Send an email notification
Args:
to_email: Recipient email address
subject: Email subject
text_content: Plain text content
html_content: HTML content (optional)
from_email: Sender email (optional, uses default)
from_name: Sender name (optional, uses default)
reply_to: Reply-to address (optional)
attachments: List of attachments (optional)
Returns:
bool: True if email was sent successfully
"""
try:
if not settings.ENABLE_EMAIL_NOTIFICATIONS:
logger.info("Email notifications disabled")
return True # Return success to avoid blocking workflow
if not self.smtp_user or not self.smtp_password:
logger.error("SMTP credentials not configured")
return False
# Validate email address
if not to_email or "@" not in to_email:
logger.error("Invalid recipient email", email=to_email)
return False
# Create message
message = MIMEMultipart('alternative')
message['Subject'] = subject
message['To'] = to_email
# Set From header
sender_email = from_email or self.default_from_email
sender_name = from_name or self.default_from_name
message['From'] = formataddr((sender_name, sender_email))
# Set Reply-To if provided
if reply_to:
message['Reply-To'] = reply_to
# Add text content
text_part = MIMEText(text_content, 'plain', 'utf-8')
message.attach(text_part)
# Add HTML content if provided
if html_content:
html_part = MIMEText(html_content, 'html', 'utf-8')
message.attach(html_part)
# Add attachments if provided
if attachments:
for attachment in attachments:
await self._add_attachment(message, attachment)
# Send email
await self._send_smtp_email(message, sender_email, to_email)
logger.info("Email sent successfully",
to=to_email,
subject=subject,
from_email=sender_email)
# Record success metrics
metrics.increment_counter("emails_sent_total", labels={"status": "success"})
return True
except Exception as e:
logger.error("Failed to send email",
to=to_email,
subject=subject,
error=str(e))
# Record failure metrics
metrics.increment_counter("emails_sent_total", labels={"status": "failed"})
return False
async def send_bulk_emails(
self,
recipients: List[str],
subject: str,
text_content: str,
html_content: Optional[str] = None,
batch_size: int = 50
) -> Dict[str, Any]:
"""
Send bulk emails with rate limiting
Args:
recipients: List of recipient email addresses
subject: Email subject
text_content: Plain text content
html_content: HTML content (optional)
batch_size: Number of emails to send per batch
Returns:
Dict containing success/failure counts
"""
results = {
"total": len(recipients),
"sent": 0,
"failed": 0,
"errors": []
}
try:
# Process in batches to respect rate limits
for i in range(0, len(recipients), batch_size):
batch = recipients[i:i + batch_size]
# Send emails concurrently within batch
tasks = [
self.send_email(
to_email=email,
subject=subject,
text_content=text_content,
html_content=html_content
)
for email in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for email, result in zip(batch, batch_results):
if isinstance(result, Exception):
results["failed"] += 1
results["errors"].append({"email": email, "error": str(result)})
elif result:
results["sent"] += 1
else:
results["failed"] += 1
results["errors"].append({"email": email, "error": "Unknown error"})
# Rate limiting delay between batches
if i + batch_size < len(recipients):
await asyncio.sleep(1.0) # 1 second delay between batches
logger.info("Bulk email completed",
total=results["total"],
sent=results["sent"],
failed=results["failed"])
return results
except Exception as e:
logger.error("Bulk email failed", error=str(e))
results["errors"].append({"error": str(e)})
return results
async def send_template_email(
self,
to_email: str,
template_name: str,
template_data: Dict[str, Any],
subject_template: Optional[str] = None
) -> bool:
"""
Send email using a template
Args:
to_email: Recipient email address
template_name: Name of the email template
template_data: Data for template rendering
subject_template: Subject template string (optional)
Returns:
bool: True if email was sent successfully
"""
try:
# Load template (simplified - in production, load from database)
template_content = await self._load_email_template(template_name)
if not template_content:
logger.error("Template not found", template=template_name)
return False
# Render subject
subject = template_name.replace("_", " ").title()
if subject_template:
subject_tmpl = Template(subject_template)
subject = subject_tmpl.render(**template_data)
# Render content
text_template = Template(template_content.get("text", ""))
text_content = text_template.render(**template_data)
html_content = None
if template_content.get("html"):
html_template = Template(template_content["html"])
html_content = html_template.render(**template_data)
return await self.send_email(
to_email=to_email,
subject=subject,
text_content=text_content,
html_content=html_content
)
except Exception as e:
logger.error("Failed to send template email",
template=template_name,
error=str(e))
return False
async def health_check(self) -> bool:
"""
Check if email service is healthy
Returns:
bool: True if service is healthy
"""
try:
if not settings.ENABLE_EMAIL_NOTIFICATIONS:
return True # Service is "healthy" if disabled
if not self.smtp_user or not self.smtp_password:
logger.warning("SMTP credentials not configured")
return False
# Test SMTP connection
if self.smtp_ssl:
server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port, use_tls=True)
else:
server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port)
await server.connect()
if self.smtp_tls:
await server.starttls()
await server.login(self.smtp_user, self.smtp_password)
await server.quit()
logger.info("Email service health check passed")
return True
except Exception as e:
logger.error("Email service health check failed", error=str(e))
return False
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _send_smtp_email(self, message: MIMEMultipart, from_email: str, to_email: str):
"""Send email via SMTP"""
try:
# Create SMTP connection
if self.smtp_ssl:
server = aiosmtplib.SMTP(
hostname=self.smtp_host,
port=self.smtp_port,
use_tls=True,
timeout=30
)
else:
server = aiosmtplib.SMTP(
hostname=self.smtp_host,
port=self.smtp_port,
timeout=30
)
await server.connect()
# Start TLS if required
if self.smtp_tls and not self.smtp_ssl:
await server.starttls()
# Login
await server.login(self.smtp_user, self.smtp_password)
# Send email
await server.send_message(message, from_addr=from_email, to_addrs=[to_email])
# Close connection
await server.quit()
except Exception as e:
logger.error("SMTP send failed", error=str(e))
raise
async def _add_attachment(self, message: MIMEMultipart, attachment: Dict[str, Any]):
"""Add attachment to email message"""
try:
filename = attachment.get("filename", "attachment")
content = attachment.get("content", b"")
content_type = attachment.get("content_type", "application/octet-stream")
# Create attachment part
part = MIMEBase(*content_type.split("/"))
part.set_payload(content)
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {filename}'
)
message.attach(part)
except Exception as e:
logger.error("Failed to add attachment", filename=attachment.get("filename"), error=str(e))
async def _load_email_template(self, template_name: str) -> Optional[Dict[str, str]]:
"""Load email template from storage"""
# Simplified template loading - in production, load from database
templates = {
"welcome": {
"text": """
¡Bienvenido a Bakery Forecast, {{user_name}}!
Gracias por registrarte en nuestra plataforma de pronóstico para panaderías.
Tu cuenta ha sido creada exitosamente y ya puedes comenzar a:
- Subir datos de ventas
- Generar pronósticos de demanda
- Optimizar tu producción
Para comenzar, visita: {{dashboard_url}}
Si tienes alguna pregunta, no dudes en contactarnos.
Saludos,
El equipo de Bakery Forecast
""",
"html": """
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; text-align: center;">
<h1 style="color: white; margin: 0;">¡Bienvenido a Bakery Forecast!</h1>
</div>
<div style="padding: 20px;">
<p>Hola <strong>{{user_name}}</strong>,</p>
<p>Gracias por registrarte en nuestra plataforma de pronóstico para panaderías.</p>
<p>Tu cuenta ha sido creada exitosamente y ya puedes comenzar a:</p>
<ul style="color: #333;">
<li>📊 Subir datos de ventas</li>
<li>🔮 Generar pronósticos de demanda</li>
<li>⚡ Optimizar tu producción</li>
</ul>
<div style="text-align: center; margin: 30px 0;">
<a href="{{dashboard_url}}"
style="background: #667eea; color: white; padding: 12px 30px;
text-decoration: none; border-radius: 5px; font-weight: bold;">
Ir al Dashboard
</a>
</div>
<p>Si tienes alguna pregunta, no dudes en contactarnos.</p>
<p>Saludos,<br>
<strong>El equipo de Bakery Forecast</strong></p>
</div>
<div style="background: #f8f9fa; padding: 15px; text-align: center; font-size: 12px; color: #666;">
© 2025 Bakery Forecast. Todos los derechos reservados.
</div>
</body>
</html>
"""
},
"forecast_alert": {
"text": """
Alerta de Pronóstico - {{bakery_name}}
Se ha detectado una variación significativa en la demanda prevista:
Producto: {{product_name}}
Fecha: {{forecast_date}}
Demanda prevista: {{predicted_demand}} unidades
Variación: {{variation_percentage}}%
{{alert_message}}
Revisa los pronósticos en: {{dashboard_url}}
Saludos,
El equipo de Bakery Forecast
""",
"html": """
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<div style="background: #ff6b6b; padding: 20px; text-align: center;">
<h1 style="color: white; margin: 0;">🚨 Alerta de Pronóstico</h1>
</div>
<div style="padding: 20px;">
<h2 style="color: #333;">{{bakery_name}}</h2>
<p>Se ha detectado una variación significativa en la demanda prevista:</p>
<div style="background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 20px 0;">
<p><strong>Producto:</strong> {{product_name}}</p>
<p><strong>Fecha:</strong> {{forecast_date}}</p>
<p><strong>Demanda prevista:</strong> {{predicted_demand}} unidades</p>
<p><strong>Variación:</strong> <span style="color: #ff6b6b; font-weight: bold;">{{variation_percentage}}%</span></p>
</div>
<div style="background: #fff3cd; border: 1px solid #ffeaa7; padding: 15px; border-radius: 5px; margin: 20px 0;">
<p style="margin: 0; color: #856404;">{{alert_message}}</p>
</div>
<div style="text-align: center; margin: 30px 0;">
<a href="{{dashboard_url}}"
style="background: #ff6b6b; color: white; padding: 12px 30px;
text-decoration: none; border-radius: 5px; font-weight: bold;">
Ver Pronósticos
</a>
</div>
</div>
</body>
</html>
"""
},
"weekly_report": {
"text": """
Reporte Semanal - {{bakery_name}}
Resumen de la semana del {{week_start}} al {{week_end}}:
Ventas Totales: {{total_sales}} unidades
Precisión del Pronóstico: {{forecast_accuracy}}%
Productos más vendidos:
{{#top_products}}
- {{name}}: {{quantity}} unidades
{{/top_products}}
Recomendaciones:
{{recommendations}}
Ver reporte completo: {{report_url}}
Saludos,
El equipo de Bakery Forecast
""",
"html": """
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<div style="background: #74b9ff; padding: 20px; text-align: center;">
<h1 style="color: white; margin: 0;">📊 Reporte Semanal</h1>
</div>
<div style="padding: 20px;">
<h2 style="color: #333;">{{bakery_name}}</h2>
<p style="color: #666;">Semana del {{week_start}} al {{week_end}}</p>
<div style="display: flex; gap: 20px; margin: 20px 0;">
<div style="background: #dff0d8; padding: 15px; border-radius: 5px; flex: 1; text-align: center;">
<h3 style="margin: 0; color: #3c763d;">{{total_sales}}</h3>
<p style="margin: 5px 0; color: #3c763d;">Ventas Totales</p>
</div>
<div style="background: #d1ecf1; padding: 15px; border-radius: 5px; flex: 1; text-align: center;">
<h3 style="margin: 0; color: #0c5460;">{{forecast_accuracy}}%</h3>
<p style="margin: 5px 0; color: #0c5460;">Precisión</p>
</div>
</div>
<h3 style="color: #333;">Productos más vendidos:</h3>
<ul style="color: #333;">
{{#top_products}}
<li><strong>{{name}}</strong>: {{quantity}} unidades</li>
{{/top_products}}
</ul>
<div style="background: #e7f3ff; padding: 15px; border-radius: 5px; margin: 20px 0;">
<h4 style="margin-top: 0; color: #004085;">Recomendaciones:</h4>
<p style="margin-bottom: 0; color: #004085;">{{recommendations}}</p>
</div>
<div style="text-align: center; margin: 30px 0;">
<a href="{{report_url}}"
style="background: #74b9ff; color: white; padding: 12px 30px;
text-decoration: none; border-radius: 5px; font-weight: bold;">
Ver Reporte Completo
</a>
</div>
</div>
</body>
</html>
"""
}
}
return templates.get(template_name)

View File

@@ -0,0 +1,499 @@
# ================================================================
# services/notification/app/services/messaging.py
# ================================================================
"""
Messaging service for notification events
Handles RabbitMQ integration for the notification service
"""
import structlog
from typing import Dict, Any
import asyncio
from shared.messaging.rabbitmq import RabbitMQClient
from shared.messaging.events import BaseEvent
from app.core.config import settings
logger = structlog.get_logger()
# Global messaging instance
notification_publisher = None
async def setup_messaging():
"""Initialize messaging services for notification service"""
global notification_publisher
try:
notification_publisher = RabbitMQClient(settings.RABBITMQ_URL, service_name="notification-service")
await notification_publisher.connect()
# Set up event consumers
await _setup_event_consumers()
logger.info("Notification service messaging setup completed")
except Exception as e:
logger.error("Failed to setup notification messaging", error=str(e))
raise
async def cleanup_messaging():
"""Cleanup messaging services"""
global notification_publisher
try:
if notification_publisher:
await notification_publisher.disconnect()
logger.info("Notification service messaging cleanup completed")
except Exception as e:
logger.error("Error during notification messaging cleanup", error=str(e))
async def _setup_event_consumers():
"""Setup event consumers for other service events"""
try:
# Listen for user registration events (from auth service)
await notification_publisher.consume_events(
exchange_name="user.events",
queue_name="notification_user_registered_queue",
routing_key="user.registered",
callback=handle_user_registered
)
# Listen for forecast alert events (from forecasting service)
await notification_publisher.consume_events(
exchange_name="forecast.events",
queue_name="notification_forecast_alert_queue",
routing_key="forecast.alert_generated",
callback=handle_forecast_alert
)
# Listen for training completion events (from training service)
await notification_publisher.consume_events(
exchange_name="training.events",
queue_name="notification_training_completed_queue",
routing_key="training.completed",
callback=handle_training_completed
)
# Listen for data import events (from data service)
await notification_publisher.consume_events(
exchange_name="data.events",
queue_name="notification_data_imported_queue",
routing_key="data.imported",
callback=handle_data_imported
)
logger.info("Notification event consumers setup completed")
except Exception as e:
logger.error("Failed to setup notification event consumers", error=str(e))
# ================================================================
# EVENT HANDLERS
# ================================================================
async def handle_user_registered(message):
"""Handle user registration events"""
try:
import json
from app.services.notification_service import NotificationService
from app.schemas.notifications import NotificationCreate, NotificationType
# Parse message
data = json.loads(message.body.decode())
user_data = data.get("data", {})
logger.info("Handling user registration event", user_id=user_data.get("user_id"))
# Send welcome email
notification_service = NotificationService()
welcome_notification = NotificationCreate(
type=NotificationType.EMAIL,
recipient_email=user_data.get("email"),
template_id="welcome",
template_data={
"user_name": user_data.get("full_name", "Usuario"),
"dashboard_url": f"{settings.FRONTEND_API_URL}/dashboard"
},
tenant_id=user_data.get("tenant_id"),
sender_id="system"
)
await notification_service.send_notification(welcome_notification)
# Acknowledge message
await message.ack()
except Exception as e:
logger.error("Failed to handle user registration event", error=str(e))
await message.nack(requeue=False)
async def handle_forecast_alert(message):
"""Handle forecast alert events"""
try:
import json
from app.services.notification_service import NotificationService
from app.schemas.notifications import NotificationCreate, NotificationType
# Parse message
data = json.loads(message.body.decode())
alert_data = data.get("data", {})
logger.info("Handling forecast alert event",
tenant_id=alert_data.get("tenant_id"),
product=alert_data.get("product_name"))
# Send alert notification to tenant users
notification_service = NotificationService()
# Email alert
email_notification = NotificationCreate(
type=NotificationType.EMAIL,
template_id="forecast_alert",
template_data={
"bakery_name": alert_data.get("bakery_name", "Tu Panadería"),
"product_name": alert_data.get("product_name"),
"forecast_date": alert_data.get("forecast_date"),
"predicted_demand": alert_data.get("predicted_demand"),
"variation_percentage": alert_data.get("variation_percentage"),
"alert_message": alert_data.get("message"),
"dashboard_url": f"{settings.FRONTEND_API_URL}/forecasts"
},
tenant_id=alert_data.get("tenant_id"),
sender_id="system",
broadcast=True,
priority="high"
)
await notification_service.send_notification(email_notification)
# WhatsApp alert for urgent cases
if alert_data.get("severity") == "urgent":
whatsapp_notification = NotificationCreate(
type=NotificationType.WHATSAPP,
message=f"🚨 ALERTA: {alert_data.get('product_name')} - Variación del {alert_data.get('variation_percentage')}% para {alert_data.get('forecast_date')}. Revisar pronósticos.",
tenant_id=alert_data.get("tenant_id"),
sender_id="system",
broadcast=True,
priority="urgent"
)
await notification_service.send_notification(whatsapp_notification)
# Acknowledge message
await message.ack()
except Exception as e:
logger.error("Failed to handle forecast alert event", error=str(e))
await message.nack(requeue=False)
async def handle_training_completed(message):
"""Handle training completion events"""
try:
import json
from app.services.notification_service import NotificationService
from app.schemas.notifications import NotificationCreate, NotificationType
# Parse message
data = json.loads(message.body.decode())
training_data = data.get("data", {})
logger.info("Handling training completion event",
tenant_id=training_data.get("tenant_id"),
job_id=training_data.get("job_id"))
# Send training completion notification
notification_service = NotificationService()
success = training_data.get("success", False)
template_data = {
"bakery_name": training_data.get("bakery_name", "Tu Panadería"),
"job_id": training_data.get("job_id"),
"model_name": training_data.get("model_name"),
"accuracy": training_data.get("accuracy"),
"completion_time": training_data.get("completion_time"),
"dashboard_url": f"{settings.FRONTEND_API_URL}/models"
}
if success:
subject = "✅ Entrenamiento de Modelo Completado"
template_data["status"] = "exitoso"
template_data["message"] = f"El modelo {training_data.get('model_name')} se ha entrenado correctamente con una precisión del {training_data.get('accuracy')}%."
else:
subject = "❌ Error en Entrenamiento de Modelo"
template_data["status"] = "fallido"
template_data["message"] = f"El entrenamiento del modelo {training_data.get('model_name')} ha fallado. Error: {training_data.get('error_message', 'Error desconocido')}"
notification = NotificationCreate(
type=NotificationType.EMAIL,
subject=subject,
message=template_data["message"],
template_data=template_data,
tenant_id=training_data.get("tenant_id"),
sender_id="system",
broadcast=True
)
await notification_service.send_notification(notification)
# Acknowledge message
await message.ack()
except Exception as e:
logger.error("Failed to handle training completion event", error=str(e))
await message.nack(requeue=False)
async def handle_data_imported(message):
"""Handle data import events"""
try:
import json
from app.services.notification_service import NotificationService
from app.schemas.notifications import NotificationCreate, NotificationType
# Parse message
data = json.loads(message.body.decode())
import_data = data.get("data", {})
logger.info("Handling data import event",
tenant_id=import_data.get("tenant_id"),
data_type=import_data.get("data_type"))
# Only send notifications for significant data imports
records_count = import_data.get("records_count", 0)
if records_count < 100: # Skip notification for small imports
await message.ack()
return
# Send data import notification
notification_service = NotificationService()
template_data = {
"bakery_name": import_data.get("bakery_name", "Tu Panadería"),
"data_type": import_data.get("data_type"),
"records_count": records_count,
"import_date": import_data.get("import_date"),
"source": import_data.get("source", "Manual"),
"dashboard_url": f"{settings.FRONTEND_API_URL}/data"
}
notification = NotificationCreate(
type=NotificationType.EMAIL,
subject=f"📊 Datos Importados: {import_data.get('data_type')}",
message=f"Se han importado {records_count} registros de {import_data.get('data_type')} desde {import_data.get('source')}.",
template_data=template_data,
tenant_id=import_data.get("tenant_id"),
sender_id="system"
)
await notification_service.send_notification(notification)
# Acknowledge message
await message.ack()
except Exception as e:
logger.error("Failed to handle data import event", error=str(e))
await message.nack(requeue=False)
# ================================================================
# NOTIFICATION EVENT PUBLISHERS
# ================================================================
async def publish_notification_sent(notification_data: Dict[str, Any]) -> bool:
"""Publish notification sent event"""
try:
if notification_publisher:
return await notification_publisher.publish_event(
"notification.events",
"notification.sent",
notification_data
)
else:
logger.warning("Notification publisher not initialized")
return False
except Exception as e:
logger.error("Failed to publish notification sent event", error=str(e))
return False
async def publish_notification_failed(notification_data: Dict[str, Any]) -> bool:
"""Publish notification failed event"""
try:
if notification_publisher:
return await notification_publisher.publish_event(
"notification.events",
"notification.failed",
notification_data
)
else:
logger.warning("Notification publisher not initialized")
return False
except Exception as e:
logger.error("Failed to publish notification failed event", error=str(e))
return False
async def publish_notification_delivered(notification_data: Dict[str, Any]) -> bool:
"""Publish notification delivered event"""
try:
if notification_publisher:
return await notification_publisher.publish_event(
"notification.events",
"notification.delivered",
notification_data
)
else:
logger.warning("Notification publisher not initialized")
return False
except Exception as e:
logger.error("Failed to publish notification delivered event", error=str(e))
return False
async def publish_bulk_notification_completed(bulk_data: Dict[str, Any]) -> bool:
"""Publish bulk notification completion event"""
try:
if notification_publisher:
return await notification_publisher.publish_event(
"notification.events",
"notification.bulk_completed",
bulk_data
)
else:
logger.warning("Notification publisher not initialized")
return False
except Exception as e:
logger.error("Failed to publish bulk notification event", error=str(e))
return False
# ================================================================
# WEBHOOK HANDLERS (for external delivery status updates)
# ================================================================
async def handle_email_delivery_webhook(webhook_data: Dict[str, Any]):
"""Handle email delivery status webhooks (e.g., from SendGrid, Mailgun)"""
try:
notification_id = webhook_data.get("notification_id")
status = webhook_data.get("status")
logger.info("Received email delivery webhook",
notification_id=notification_id,
status=status)
# Update notification status in database
from app.services.notification_service import NotificationService
notification_service = NotificationService()
# This would require additional method in NotificationService
# await notification_service.update_delivery_status(notification_id, status)
# Publish delivery event
await publish_notification_delivered({
"notification_id": notification_id,
"status": status,
"delivery_time": webhook_data.get("timestamp"),
"provider": webhook_data.get("provider")
})
except Exception as e:
logger.error("Failed to handle email delivery webhook", error=str(e))
async def handle_whatsapp_delivery_webhook(webhook_data: Dict[str, Any]):
"""Handle WhatsApp delivery status webhooks (from Twilio)"""
try:
message_sid = webhook_data.get("MessageSid")
status = webhook_data.get("MessageStatus")
logger.info("Received WhatsApp delivery webhook",
message_sid=message_sid,
status=status)
# Map Twilio status to our status
status_mapping = {
"sent": "sent",
"delivered": "delivered",
"read": "read",
"failed": "failed",
"undelivered": "failed"
}
mapped_status = status_mapping.get(status, status)
# Publish delivery event
await publish_notification_delivered({
"provider_message_id": message_sid,
"status": mapped_status,
"delivery_time": webhook_data.get("timestamp"),
"provider": "twilio"
})
except Exception as e:
logger.error("Failed to handle WhatsApp delivery webhook", error=str(e))
# ================================================================
# SCHEDULED NOTIFICATION PROCESSING
# ================================================================
async def process_scheduled_notifications():
"""Process scheduled notifications (called by background task)"""
try:
from datetime import datetime
from app.core.database import get_db
from app.models.notifications import Notification, NotificationStatus
from app.services.notification_service import NotificationService
from sqlalchemy import select, and_
logger.info("Processing scheduled notifications")
async for db in get_db():
# Get notifications scheduled for now or earlier
now = datetime.utcnow()
result = await db.execute(
select(Notification).where(
and_(
Notification.status == NotificationStatus.PENDING,
Notification.scheduled_at <= now,
Notification.scheduled_at.isnot(None)
)
).limit(100) # Process in batches
)
scheduled_notifications = result.scalars().all()
if not scheduled_notifications:
return
logger.info("Found scheduled notifications to process",
count=len(scheduled_notifications))
notification_service = NotificationService()
for notification in scheduled_notifications:
try:
# Convert to schema for processing
from app.schemas.notifications import NotificationCreate, NotificationType
notification_create = NotificationCreate(
type=NotificationType(notification.type.value),
recipient_id=str(notification.recipient_id) if notification.recipient_id else None,
recipient_email=notification.recipient_email,
recipient_phone=notification.recipient_phone,
subject=notification.subject,
message=notification.message,
html_content=notification.html_content,
template_id=notification.template_id,
template_data=notification.template_data,
priority=notification.priority,
tenant_id=str(notification.tenant_id),
sender_id=str(notification.sender_id),
broadcast=notification.broadcast
)
# Process the scheduled notification
await notification_service.send_notification(notification_create)
except Exception as e:
logger.error("Failed to process scheduled notification",
notification_id=str(notification.id),
error=str(e))
await db.commit()
except Exception as e:
logger.error("Failed to process scheduled notifications", error=str(e))

View File

@@ -0,0 +1,672 @@
# ================================================================
# services/notification/app/services/notification_service.py
# ================================================================
"""
Main notification service business logic
Orchestrates notification delivery across multiple channels
"""
import structlog
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import asyncio
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, desc, func, update
from jinja2 import Template
from app.models.notifications import (
Notification, NotificationTemplate, NotificationPreference,
NotificationLog, NotificationType, NotificationStatus, NotificationPriority
)
from app.schemas.notifications import (
NotificationCreate, NotificationResponse, NotificationHistory,
NotificationStats, BulkNotificationCreate
)
from app.services.email_service import EmailService
from app.services.whatsapp_service import WhatsAppService
from app.services.messaging import publish_notification_sent, publish_notification_failed
from app.core.config import settings
from app.core.database import get_db
from shared.monitoring.metrics import MetricsCollector
logger = structlog.get_logger()
metrics = MetricsCollector("notification-service")
class NotificationService:
"""
Main service class for managing notification operations.
Handles email, WhatsApp, and other notification channels.
"""
def __init__(self):
self.email_service = EmailService()
self.whatsapp_service = WhatsAppService()
async def send_notification(self, notification: NotificationCreate) -> NotificationResponse:
"""Send a single notification"""
try:
start_time = datetime.utcnow()
# Create notification record
async for db in get_db():
# Check user preferences if recipient specified
if notification.recipient_id:
preferences = await self._get_user_preferences(
db, notification.recipient_id, notification.tenant_id
)
# Check if user allows this type of notification
if not self._is_notification_allowed(notification.type, preferences):
logger.info("Notification blocked by user preferences",
recipient=notification.recipient_id,
type=notification.type.value)
# Still create record but mark as cancelled
db_notification = await self._create_notification_record(
db, notification, NotificationStatus.CANCELLED
)
await db.commit()
return NotificationResponse.from_orm(db_notification)
# Create pending notification
db_notification = await self._create_notification_record(
db, notification, NotificationStatus.PENDING
)
await db.commit()
# Process template if specified
if notification.template_id:
notification = await self._process_template(
db, notification, notification.template_id
)
# Send based on type
success = False
error_message = None
try:
if notification.type == NotificationType.EMAIL:
success = await self._send_email(notification)
elif notification.type == NotificationType.WHATSAPP:
success = await self._send_whatsapp(notification)
elif notification.type == NotificationType.PUSH:
success = await self._send_push(notification)
else:
error_message = f"Unsupported notification type: {notification.type}"
except Exception as e:
logger.error("Failed to send notification", error=str(e))
error_message = str(e)
# Update notification status
new_status = NotificationStatus.SENT if success else NotificationStatus.FAILED
await self._update_notification_status(
db, db_notification.id, new_status, error_message
)
# Log attempt
await self._log_delivery_attempt(
db, db_notification.id, 1, new_status, error_message
)
await db.commit()
# Publish event
if success:
await publish_notification_sent({
"notification_id": str(db_notification.id),
"type": notification.type.value,
"tenant_id": notification.tenant_id,
"recipient_id": notification.recipient_id
})
else:
await publish_notification_failed({
"notification_id": str(db_notification.id),
"type": notification.type.value,
"error": error_message,
"tenant_id": notification.tenant_id
})
# Record metrics
processing_time = (datetime.utcnow() - start_time).total_seconds()
metrics.observe_histogram(
"notification_processing_duration_seconds",
processing_time
)
metrics.increment_counter(
"notifications_sent_total",
labels={
"type": notification.type.value,
"status": "success" if success else "failed"
}
)
# Refresh the object to get updated data
await db.refresh(db_notification)
return NotificationResponse.from_orm(db_notification)
except Exception as e:
logger.error("Failed to process notification", error=str(e))
metrics.increment_counter("notification_errors_total")
raise
async def send_bulk_notifications(self, bulk_request: BulkNotificationCreate) -> Dict[str, Any]:
"""Send notifications to multiple recipients"""
try:
results = {
"total": len(bulk_request.recipients),
"sent": 0,
"failed": 0,
"notification_ids": []
}
# Process in batches to avoid overwhelming the system
batch_size = settings.BATCH_SIZE
for i in range(0, len(bulk_request.recipients), batch_size):
batch = bulk_request.recipients[i:i + batch_size]
# Create individual notifications for each recipient
tasks = []
for recipient in batch:
individual_notification = NotificationCreate(
type=bulk_request.type,
recipient_id=recipient if not "@" in recipient else None,
recipient_email=recipient if "@" in recipient else None,
subject=bulk_request.subject,
message=bulk_request.message,
html_content=bulk_request.html_content,
template_id=bulk_request.template_id,
template_data=bulk_request.template_data,
priority=bulk_request.priority,
scheduled_at=bulk_request.scheduled_at,
broadcast=True
)
tasks.append(self.send_notification(individual_notification))
# Process batch concurrently
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in batch_results:
if isinstance(result, Exception):
results["failed"] += 1
logger.error("Bulk notification failed", error=str(result))
else:
results["sent"] += 1
results["notification_ids"].append(result.id)
# Small delay between batches to prevent rate limiting
if i + batch_size < len(bulk_request.recipients):
await asyncio.sleep(0.1)
logger.info("Bulk notification completed",
total=results["total"],
sent=results["sent"],
failed=results["failed"])
return results
except Exception as e:
logger.error("Failed to send bulk notifications", error=str(e))
raise
async def get_notification_history(
self,
user_id: str,
tenant_id: str,
page: int = 1,
per_page: int = 50,
type_filter: Optional[NotificationType] = None,
status_filter: Optional[NotificationStatus] = None
) -> NotificationHistory:
"""Get notification history for a user"""
try:
async for db in get_db():
# Build query
query = select(Notification).where(
and_(
Notification.tenant_id == tenant_id,
Notification.recipient_id == user_id
)
)
if type_filter:
query = query.where(Notification.type == type_filter)
if status_filter:
query = query.where(Notification.status == status_filter)
# Get total count
count_query = select(func.count()).select_from(query.subquery())
total = await db.scalar(count_query)
# Get paginated results
offset = (page - 1) * per_page
query = query.order_by(desc(Notification.created_at)).offset(offset).limit(per_page)
result = await db.execute(query)
notifications = result.scalars().all()
# Convert to response objects
notification_responses = [
NotificationResponse.from_orm(notification)
for notification in notifications
]
return NotificationHistory(
notifications=notification_responses,
total=total,
page=page,
per_page=per_page,
has_next=offset + per_page < total,
has_prev=page > 1
)
except Exception as e:
logger.error("Failed to get notification history", error=str(e))
raise
async def get_notification_stats(self, tenant_id: str, days: int = 30) -> NotificationStats:
"""Get notification statistics for a tenant"""
try:
async for db in get_db():
# Date range
start_date = datetime.utcnow() - timedelta(days=days)
# Basic counts
base_query = select(Notification).where(
and_(
Notification.tenant_id == tenant_id,
Notification.created_at >= start_date
)
)
# Total sent
sent_query = base_query.where(Notification.status == NotificationStatus.SENT)
total_sent = await db.scalar(select(func.count()).select_from(sent_query.subquery()))
# Total delivered
delivered_query = base_query.where(Notification.status == NotificationStatus.DELIVERED)
total_delivered = await db.scalar(select(func.count()).select_from(delivered_query.subquery()))
# Total failed
failed_query = base_query.where(Notification.status == NotificationStatus.FAILED)
total_failed = await db.scalar(select(func.count()).select_from(failed_query.subquery()))
# Delivery rate
delivery_rate = (total_delivered / max(total_sent, 1)) * 100
# Average delivery time
avg_delivery_time = None
if total_delivered > 0:
delivery_time_query = select(
func.avg(
func.extract('epoch', Notification.delivered_at - Notification.sent_at) / 60
)
).where(
and_(
Notification.tenant_id == tenant_id,
Notification.status == NotificationStatus.DELIVERED,
Notification.sent_at.isnot(None),
Notification.delivered_at.isnot(None),
Notification.created_at >= start_date
)
)
avg_delivery_time = await db.scalar(delivery_time_query)
# By type
type_query = select(
Notification.type,
func.count(Notification.id)
).where(
and_(
Notification.tenant_id == tenant_id,
Notification.created_at >= start_date
)
).group_by(Notification.type)
type_results = await db.execute(type_query)
by_type = {str(row[0].value): row[1] for row in type_results}
# By status
status_query = select(
Notification.status,
func.count(Notification.id)
).where(
and_(
Notification.tenant_id == tenant_id,
Notification.created_at >= start_date
)
).group_by(Notification.status)
status_results = await db.execute(status_query)
by_status = {str(row[0].value): row[1] for row in status_results}
# Recent activity (last 10 notifications)
recent_query = base_query.order_by(desc(Notification.created_at)).limit(10)
recent_result = await db.execute(recent_query)
recent_notifications = recent_result.scalars().all()
recent_activity = [
{
"id": str(notification.id),
"type": notification.type.value,
"status": notification.status.value,
"created_at": notification.created_at.isoformat(),
"recipient_email": notification.recipient_email
}
for notification in recent_notifications
]
return NotificationStats(
total_sent=total_sent or 0,
total_delivered=total_delivered or 0,
total_failed=total_failed or 0,
delivery_rate=round(delivery_rate, 2),
avg_delivery_time_minutes=round(avg_delivery_time, 2) if avg_delivery_time else None,
by_type=by_type,
by_status=by_status,
recent_activity=recent_activity
)
except Exception as e:
logger.error("Failed to get notification stats", error=str(e))
raise
async def get_user_preferences(
self,
user_id: str,
tenant_id: str
) -> Dict[str, Any]:
"""Get user notification preferences"""
try:
async for db in get_db():
result = await db.execute(
select(NotificationPreference).where(
and_(
NotificationPreference.user_id == user_id,
NotificationPreference.tenant_id == tenant_id
)
)
)
preferences = result.scalar_one_or_none()
if not preferences:
# Create default preferences
preferences = NotificationPreference(
user_id=user_id,
tenant_id=tenant_id
)
db.add(preferences)
await db.commit()
await db.refresh(preferences)
return {
"user_id": str(preferences.user_id),
"tenant_id": str(preferences.tenant_id),
"email_enabled": preferences.email_enabled,
"email_alerts": preferences.email_alerts,
"email_marketing": preferences.email_marketing,
"email_reports": preferences.email_reports,
"whatsapp_enabled": preferences.whatsapp_enabled,
"whatsapp_alerts": preferences.whatsapp_alerts,
"whatsapp_reports": preferences.whatsapp_reports,
"push_enabled": preferences.push_enabled,
"push_alerts": preferences.push_alerts,
"push_reports": preferences.push_reports,
"quiet_hours_start": preferences.quiet_hours_start,
"quiet_hours_end": preferences.quiet_hours_end,
"timezone": preferences.timezone,
"digest_frequency": preferences.digest_frequency,
"max_emails_per_day": preferences.max_emails_per_day,
"language": preferences.language,
"created_at": preferences.created_at,
"updated_at": preferences.updated_at
}
except Exception as e:
logger.error("Failed to get user preferences", error=str(e))
raise
async def update_user_preferences(
self,
user_id: str,
tenant_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update user notification preferences"""
try:
async for db in get_db():
# Get existing preferences or create new
result = await db.execute(
select(NotificationPreference).where(
and_(
NotificationPreference.user_id == user_id,
NotificationPreference.tenant_id == tenant_id
)
)
)
preferences = result.scalar_one_or_none()
if not preferences:
preferences = NotificationPreference(
user_id=user_id,
tenant_id=tenant_id
)
db.add(preferences)
# Update fields
for field, value in updates.items():
if hasattr(preferences, field) and value is not None:
setattr(preferences, field, value)
preferences.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(preferences)
logger.info("Updated user preferences",
user_id=user_id,
tenant_id=tenant_id,
updates=list(updates.keys()))
return await self.get_user_preferences(user_id, tenant_id)
except Exception as e:
logger.error("Failed to update user preferences", error=str(e))
raise
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _create_notification_record(
self,
db: AsyncSession,
notification: NotificationCreate,
status: NotificationStatus
) -> Notification:
"""Create a notification record in the database"""
db_notification = Notification(
tenant_id=notification.tenant_id,
sender_id=notification.sender_id,
recipient_id=notification.recipient_id,
type=notification.type,
status=status,
priority=notification.priority,
subject=notification.subject,
message=notification.message,
html_content=notification.html_content,
template_id=notification.template_id,
template_data=notification.template_data,
recipient_email=notification.recipient_email,
recipient_phone=notification.recipient_phone,
scheduled_at=notification.scheduled_at,
broadcast=notification.broadcast
)
db.add(db_notification)
await db.flush() # Get the ID without committing
return db_notification
async def _update_notification_status(
self,
db: AsyncSession,
notification_id: uuid.UUID,
status: NotificationStatus,
error_message: Optional[str] = None
):
"""Update notification status"""
update_data = {
"status": status,
"updated_at": datetime.utcnow()
}
if status == NotificationStatus.SENT:
update_data["sent_at"] = datetime.utcnow()
elif status == NotificationStatus.DELIVERED:
update_data["delivered_at"] = datetime.utcnow()
elif status == NotificationStatus.FAILED and error_message:
update_data["error_message"] = error_message
await db.execute(
update(Notification)
.where(Notification.id == notification_id)
.values(**update_data)
)
async def _log_delivery_attempt(
self,
db: AsyncSession,
notification_id: uuid.UUID,
attempt_number: int,
status: NotificationStatus,
error_message: Optional[str] = None
):
"""Log a delivery attempt"""
log_entry = NotificationLog(
notification_id=notification_id,
attempt_number=attempt_number,
status=status,
attempted_at=datetime.utcnow(),
error_message=error_message
)
db.add(log_entry)
async def _get_user_preferences(
self,
db: AsyncSession,
user_id: str,
tenant_id: str
) -> Optional[NotificationPreference]:
"""Get user preferences from database"""
result = await db.execute(
select(NotificationPreference).where(
and_(
NotificationPreference.user_id == user_id,
NotificationPreference.tenant_id == tenant_id
)
)
)
return result.scalar_one_or_none()
def _is_notification_allowed(
self,
notification_type: NotificationType,
preferences: Optional[NotificationPreference]
) -> bool:
"""Check if notification is allowed based on user preferences"""
if not preferences:
return True # Default to allow if no preferences set
if notification_type == NotificationType.EMAIL:
return preferences.email_enabled
elif notification_type == NotificationType.WHATSAPP:
return preferences.whatsapp_enabled
elif notification_type == NotificationType.PUSH:
return preferences.push_enabled
return True # Default to allow for unknown types
async def _process_template(
self,
db: AsyncSession,
notification: NotificationCreate,
template_id: str
) -> NotificationCreate:
"""Process notification template"""
try:
# Get template
result = await db.execute(
select(NotificationTemplate).where(
and_(
NotificationTemplate.template_key == template_id,
NotificationTemplate.is_active == True,
NotificationTemplate.type == notification.type
)
)
)
template = result.scalar_one_or_none()
if not template:
logger.warning("Template not found", template_id=template_id)
return notification
# Process template variables
template_data = notification.template_data or {}
# Render subject
if template.subject_template:
subject_template = Template(template.subject_template)
notification.subject = subject_template.render(**template_data)
# Render body
body_template = Template(template.body_template)
notification.message = body_template.render(**template_data)
# Render HTML if available
if template.html_template:
html_template = Template(template.html_template)
notification.html_content = html_template.render(**template_data)
logger.info("Template processed successfully", template_id=template_id)
return notification
except Exception as e:
logger.error("Failed to process template", template_id=template_id, error=str(e))
return notification # Return original if template processing fails
async def _send_email(self, notification: NotificationCreate) -> bool:
"""Send email notification"""
try:
return await self.email_service.send_email(
to_email=notification.recipient_email,
subject=notification.subject or "Notification",
text_content=notification.message,
html_content=notification.html_content
)
except Exception as e:
logger.error("Failed to send email", error=str(e))
return False
async def _send_whatsapp(self, notification: NotificationCreate) -> bool:
"""Send WhatsApp notification"""
try:
return await self.whatsapp_service.send_message(
to_phone=notification.recipient_phone,
message=notification.message
)
except Exception as e:
logger.error("Failed to send WhatsApp", error=str(e))
return False
async def _send_push(self, notification: NotificationCreate) -> bool:
"""Send push notification (placeholder)"""
logger.info("Push notifications not yet implemented")
return False

View File

@@ -0,0 +1,337 @@
# ================================================================
# services/notification/app/services/whatsapp_service.py
# ================================================================
"""
WhatsApp service for sending notifications
Integrates with WhatsApp Business API via Twilio
"""
import structlog
import httpx
from typing import Optional, Dict, Any, List
import asyncio
from urllib.parse import quote
from app.core.config import settings
from shared.monitoring.metrics import MetricsCollector
logger = structlog.get_logger()
metrics = MetricsCollector("notification-service")
class WhatsAppService:
"""
WhatsApp service for sending notifications via Twilio WhatsApp API
Supports text messages and template messages
"""
def __init__(self):
self.api_key = settings.WHATSAPP_API_KEY
self.base_url = settings.WHATSAPP_BASE_URL
self.from_number = settings.WHATSAPP_FROM_NUMBER
self.enabled = settings.ENABLE_WHATSAPP_NOTIFICATIONS
async def send_message(
self,
to_phone: str,
message: str,
template_name: Optional[str] = None,
template_params: Optional[List[str]] = None
) -> bool:
"""
Send WhatsApp message
Args:
to_phone: Recipient phone number (with country code)
message: Message text
template_name: WhatsApp template name (optional)
template_params: Template parameters (optional)
Returns:
bool: True if message was sent successfully
"""
try:
if not self.enabled:
logger.info("WhatsApp notifications disabled")
return True # Return success to avoid blocking workflow
if not self.api_key:
logger.error("WhatsApp API key not configured")
return False
# Validate phone number
phone = self._format_phone_number(to_phone)
if not phone:
logger.error("Invalid phone number", phone=to_phone)
return False
# Send template message if template specified
if template_name:
success = await self._send_template_message(
phone, template_name, template_params or []
)
else:
# Send regular text message
success = await self._send_text_message(phone, message)
if success:
logger.info("WhatsApp message sent successfully",
to=phone,
template=template_name)
# Record success metrics
metrics.increment_counter("whatsapp_sent_total", labels={"status": "success"})
else:
# Record failure metrics
metrics.increment_counter("whatsapp_sent_total", labels={"status": "failed"})
return success
except Exception as e:
logger.error("Failed to send WhatsApp message",
to=to_phone,
error=str(e))
# Record failure metrics
metrics.increment_counter("whatsapp_sent_total", labels={"status": "failed"})
return False
async def send_bulk_messages(
self,
recipients: List[str],
message: str,
template_name: Optional[str] = None,
batch_size: int = 20
) -> Dict[str, Any]:
"""
Send bulk WhatsApp messages with rate limiting
Args:
recipients: List of recipient phone numbers
message: Message text
template_name: WhatsApp template name (optional)
batch_size: Number of messages to send per batch
Returns:
Dict containing success/failure counts
"""
results = {
"total": len(recipients),
"sent": 0,
"failed": 0,
"errors": []
}
try:
# Process in batches to respect WhatsApp rate limits
for i in range(0, len(recipients), batch_size):
batch = recipients[i:i + batch_size]
# Send messages concurrently within batch
tasks = [
self.send_message(
to_phone=phone,
message=message,
template_name=template_name
)
for phone in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for phone, result in zip(batch, batch_results):
if isinstance(result, Exception):
results["failed"] += 1
results["errors"].append({"phone": phone, "error": str(result)})
elif result:
results["sent"] += 1
else:
results["failed"] += 1
results["errors"].append({"phone": phone, "error": "Unknown error"})
# Rate limiting delay between batches (WhatsApp has strict limits)
if i + batch_size < len(recipients):
await asyncio.sleep(2.0) # 2 second delay between batches
logger.info("Bulk WhatsApp completed",
total=results["total"],
sent=results["sent"],
failed=results["failed"])
return results
except Exception as e:
logger.error("Bulk WhatsApp failed", error=str(e))
results["errors"].append({"error": str(e)})
return results
async def health_check(self) -> bool:
"""
Check if WhatsApp service is healthy
Returns:
bool: True if service is healthy
"""
try:
if not self.enabled:
return True # Service is "healthy" if disabled
if not self.api_key:
logger.warning("WhatsApp API key not configured")
return False
# Test API connectivity with a simple request
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/v1/Account", # Twilio account info endpoint
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
)
if response.status_code == 200:
logger.info("WhatsApp service health check passed")
return True
else:
logger.error("WhatsApp service health check failed",
status_code=response.status_code)
return False
except Exception as e:
logger.error("WhatsApp service health check failed", error=str(e))
return False
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _send_text_message(self, to_phone: str, message: str) -> bool:
"""Send regular text message via Twilio"""
try:
# Prepare request data
data = {
"From": f"whatsapp:{self.from_number}",
"To": f"whatsapp:{to_phone}",
"Body": message
}
# Send via Twilio API
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages.json",
data=data,
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
)
if response.status_code == 201:
response_data = response.json()
logger.debug("WhatsApp message sent",
message_sid=response_data.get("sid"),
status=response_data.get("status"))
return True
else:
logger.error("WhatsApp API error",
status_code=response.status_code,
response=response.text)
return False
except Exception as e:
logger.error("Failed to send WhatsApp text message", error=str(e))
return False
async def _send_template_message(
self,
to_phone: str,
template_name: str,
parameters: List[str]
) -> bool:
"""Send WhatsApp template message via Twilio"""
try:
# Prepare template data
content_variables = {str(i+1): param for i, param in enumerate(parameters)}
data = {
"From": f"whatsapp:{self.from_number}",
"To": f"whatsapp:{to_phone}",
"ContentSid": template_name, # Template SID in Twilio
"ContentVariables": str(content_variables) if content_variables else "{}"
}
# Send via Twilio API
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages.json",
data=data,
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
)
if response.status_code == 201:
response_data = response.json()
logger.debug("WhatsApp template message sent",
message_sid=response_data.get("sid"),
template=template_name)
return True
else:
logger.error("WhatsApp template API error",
status_code=response.status_code,
response=response.text,
template=template_name)
return False
except Exception as e:
logger.error("Failed to send WhatsApp template message",
template=template_name,
error=str(e))
return False
def _format_phone_number(self, phone: str) -> Optional[str]:
"""
Format phone number for WhatsApp (Spanish format)
Args:
phone: Input phone number
Returns:
Formatted phone number or None if invalid
"""
if not phone:
return None
# Remove spaces, dashes, and other non-digit characters
clean_phone = "".join(filter(str.isdigit, phone.replace("+", "")))
# Handle Spanish phone numbers
if clean_phone.startswith("34"):
# Already has country code
return f"+{clean_phone}"
elif clean_phone.startswith(("6", "7", "9")) and len(clean_phone) == 9:
# Spanish mobile/landline without country code
return f"+34{clean_phone}"
elif len(clean_phone) == 9 and clean_phone[0] in "679":
# Likely Spanish mobile
return f"+34{clean_phone}"
else:
logger.warning("Unrecognized phone format", phone=phone)
return None
async def _get_message_status(self, message_sid: str) -> Optional[str]:
"""Get message delivery status from Twilio"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/2010-04-01/Accounts/{self.api_key.split(':')[0]}/Messages/{message_sid}.json",
auth=(self.api_key.split(":")[0], self.api_key.split(":")[1])
)
if response.status_code == 200:
data = response.json()
return data.get("status")
else:
logger.error("Failed to get message status",
message_sid=message_sid,
status_code=response.status_code)
return None
except Exception as e:
logger.error("Failed to check message status",
message_sid=message_sid,
error=str(e))
return None