499 lines
19 KiB
Python
499 lines
19 KiB
Python
# ================================================================
|
|
# services/notification/app/services/messaging.py
|
|
# ================================================================
|
|
"""
|
|
Messaging service for notification events
|
|
Handles RabbitMQ integration for the notification service
|
|
"""
|
|
|
|
import structlog
|
|
from typing import Dict, Any
|
|
import asyncio
|
|
|
|
from shared.messaging.rabbitmq import RabbitMQClient
|
|
from shared.messaging.events import BaseEvent
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# Global messaging instance
|
|
notification_publisher = None
|
|
|
|
async def setup_messaging():
|
|
"""Initialize messaging services for notification service"""
|
|
global notification_publisher
|
|
|
|
try:
|
|
notification_publisher = RabbitMQClient(settings.RABBITMQ_URL, service_name="notification-service")
|
|
await notification_publisher.connect()
|
|
|
|
# Set up event consumers
|
|
await _setup_event_consumers()
|
|
|
|
logger.info("Notification service messaging setup completed")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to setup notification messaging", error=str(e))
|
|
raise
|
|
|
|
async def cleanup_messaging():
|
|
"""Cleanup messaging services"""
|
|
global notification_publisher
|
|
|
|
try:
|
|
if notification_publisher:
|
|
await notification_publisher.disconnect()
|
|
logger.info("Notification service messaging cleanup completed")
|
|
except Exception as e:
|
|
logger.error("Error during notification messaging cleanup", error=str(e))
|
|
|
|
async def _setup_event_consumers():
|
|
"""Setup event consumers for other service events"""
|
|
try:
|
|
# Listen for user registration events (from auth service)
|
|
await notification_publisher.consume_events(
|
|
exchange_name="user.events",
|
|
queue_name="notification_user_registered_queue",
|
|
routing_key="user.registered",
|
|
callback=handle_user_registered
|
|
)
|
|
|
|
# Listen for forecast alert events (from forecasting service)
|
|
await notification_publisher.consume_events(
|
|
exchange_name="forecast.events",
|
|
queue_name="notification_forecast_alert_queue",
|
|
routing_key="forecast.alert_generated",
|
|
callback=handle_forecast_alert
|
|
)
|
|
|
|
# Listen for training completion events (from training service)
|
|
await notification_publisher.consume_events(
|
|
exchange_name="training.events",
|
|
queue_name="notification_training_completed_queue",
|
|
routing_key="training.completed",
|
|
callback=handle_training_completed
|
|
)
|
|
|
|
# Listen for data import events (from data service)
|
|
await notification_publisher.consume_events(
|
|
exchange_name="data.events",
|
|
queue_name="notification_data_imported_queue",
|
|
routing_key="data.imported",
|
|
callback=handle_data_imported
|
|
)
|
|
|
|
logger.info("Notification event consumers setup completed")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to setup notification event consumers", error=str(e))
|
|
|
|
# ================================================================
|
|
# EVENT HANDLERS
|
|
# ================================================================
|
|
|
|
async def handle_user_registered(message):
|
|
"""Handle user registration events"""
|
|
try:
|
|
import json
|
|
from app.services.notification_service import NotificationService
|
|
from app.schemas.notifications import NotificationCreate, NotificationType
|
|
|
|
# Parse message
|
|
data = json.loads(message.body.decode())
|
|
user_data = data.get("data", {})
|
|
|
|
logger.info("Handling user registration event", user_id=user_data.get("user_id"))
|
|
|
|
# Send welcome email
|
|
notification_service = NotificationService()
|
|
welcome_notification = NotificationCreate(
|
|
type=NotificationType.EMAIL,
|
|
recipient_email=user_data.get("email"),
|
|
template_id="welcome",
|
|
template_data={
|
|
"user_name": user_data.get("full_name", "Usuario"),
|
|
"dashboard_url": f"{settings.FRONTEND_API_URL}/dashboard"
|
|
},
|
|
tenant_id=user_data.get("tenant_id"),
|
|
sender_id="system"
|
|
)
|
|
|
|
await notification_service.send_notification(welcome_notification)
|
|
|
|
# Acknowledge message
|
|
await message.ack()
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle user registration event", error=str(e))
|
|
await message.nack(requeue=False)
|
|
|
|
async def handle_forecast_alert(message):
|
|
"""Handle forecast alert events"""
|
|
try:
|
|
import json
|
|
from app.services.notification_service import NotificationService
|
|
from app.schemas.notifications import NotificationCreate, NotificationType
|
|
|
|
# Parse message
|
|
data = json.loads(message.body.decode())
|
|
alert_data = data.get("data", {})
|
|
|
|
logger.info("Handling forecast alert event",
|
|
tenant_id=alert_data.get("tenant_id"),
|
|
product=alert_data.get("product_name"))
|
|
|
|
# Send alert notification to tenant users
|
|
notification_service = NotificationService()
|
|
|
|
# Email alert
|
|
email_notification = NotificationCreate(
|
|
type=NotificationType.EMAIL,
|
|
template_id="forecast_alert",
|
|
template_data={
|
|
"bakery_name": alert_data.get("bakery_name", "Tu Panadería"),
|
|
"product_name": alert_data.get("product_name"),
|
|
"forecast_date": alert_data.get("forecast_date"),
|
|
"predicted_demand": alert_data.get("predicted_demand"),
|
|
"variation_percentage": alert_data.get("variation_percentage"),
|
|
"alert_message": alert_data.get("message"),
|
|
"dashboard_url": f"{settings.FRONTEND_API_URL}/forecasts"
|
|
},
|
|
tenant_id=alert_data.get("tenant_id"),
|
|
sender_id="system",
|
|
broadcast=True,
|
|
priority="high"
|
|
)
|
|
|
|
await notification_service.send_notification(email_notification)
|
|
|
|
# WhatsApp alert for urgent cases
|
|
if alert_data.get("severity") == "urgent":
|
|
whatsapp_notification = NotificationCreate(
|
|
type=NotificationType.WHATSAPP,
|
|
message=f"🚨 ALERTA: {alert_data.get('product_name')} - Variación del {alert_data.get('variation_percentage')}% para {alert_data.get('forecast_date')}. Revisar pronósticos.",
|
|
tenant_id=alert_data.get("tenant_id"),
|
|
sender_id="system",
|
|
broadcast=True,
|
|
priority="urgent"
|
|
)
|
|
|
|
await notification_service.send_notification(whatsapp_notification)
|
|
|
|
# Acknowledge message
|
|
await message.ack()
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle forecast alert event", error=str(e))
|
|
await message.nack(requeue=False)
|
|
|
|
async def handle_training_completed(message):
|
|
"""Handle training completion events"""
|
|
try:
|
|
import json
|
|
from app.services.notification_service import NotificationService
|
|
from app.schemas.notifications import NotificationCreate, NotificationType
|
|
|
|
# Parse message
|
|
data = json.loads(message.body.decode())
|
|
training_data = data.get("data", {})
|
|
|
|
logger.info("Handling training completion event",
|
|
tenant_id=training_data.get("tenant_id"),
|
|
job_id=training_data.get("job_id"))
|
|
|
|
# Send training completion notification
|
|
notification_service = NotificationService()
|
|
|
|
success = training_data.get("success", False)
|
|
template_data = {
|
|
"bakery_name": training_data.get("bakery_name", "Tu Panadería"),
|
|
"job_id": training_data.get("job_id"),
|
|
"model_name": training_data.get("model_name"),
|
|
"accuracy": training_data.get("accuracy"),
|
|
"completion_time": training_data.get("completion_time"),
|
|
"dashboard_url": f"{settings.FRONTEND_API_URL}/models"
|
|
}
|
|
|
|
if success:
|
|
subject = "✅ Entrenamiento de Modelo Completado"
|
|
template_data["status"] = "exitoso"
|
|
template_data["message"] = f"El modelo {training_data.get('model_name')} se ha entrenado correctamente con una precisión del {training_data.get('accuracy')}%."
|
|
else:
|
|
subject = "❌ Error en Entrenamiento de Modelo"
|
|
template_data["status"] = "fallido"
|
|
template_data["message"] = f"El entrenamiento del modelo {training_data.get('model_name')} ha fallado. Error: {training_data.get('error_message', 'Error desconocido')}"
|
|
|
|
notification = NotificationCreate(
|
|
type=NotificationType.EMAIL,
|
|
subject=subject,
|
|
message=template_data["message"],
|
|
template_data=template_data,
|
|
tenant_id=training_data.get("tenant_id"),
|
|
sender_id="system",
|
|
broadcast=True
|
|
)
|
|
|
|
await notification_service.send_notification(notification)
|
|
|
|
# Acknowledge message
|
|
await message.ack()
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle training completion event", error=str(e))
|
|
await message.nack(requeue=False)
|
|
|
|
async def handle_data_imported(message):
|
|
"""Handle data import events"""
|
|
try:
|
|
import json
|
|
from app.services.notification_service import NotificationService
|
|
from app.schemas.notifications import NotificationCreate, NotificationType
|
|
|
|
# Parse message
|
|
data = json.loads(message.body.decode())
|
|
import_data = data.get("data", {})
|
|
|
|
logger.info("Handling data import event",
|
|
tenant_id=import_data.get("tenant_id"),
|
|
data_type=import_data.get("data_type"))
|
|
|
|
# Only send notifications for significant data imports
|
|
records_count = import_data.get("records_count", 0)
|
|
if records_count < 100: # Skip notification for small imports
|
|
await message.ack()
|
|
return
|
|
|
|
# Send data import notification
|
|
notification_service = NotificationService()
|
|
|
|
template_data = {
|
|
"bakery_name": import_data.get("bakery_name", "Tu Panadería"),
|
|
"data_type": import_data.get("data_type"),
|
|
"records_count": records_count,
|
|
"import_date": import_data.get("import_date"),
|
|
"source": import_data.get("source", "Manual"),
|
|
"dashboard_url": f"{settings.FRONTEND_API_URL}/data"
|
|
}
|
|
|
|
notification = NotificationCreate(
|
|
type=NotificationType.EMAIL,
|
|
subject=f"📊 Datos Importados: {import_data.get('data_type')}",
|
|
message=f"Se han importado {records_count} registros de {import_data.get('data_type')} desde {import_data.get('source')}.",
|
|
template_data=template_data,
|
|
tenant_id=import_data.get("tenant_id"),
|
|
sender_id="system"
|
|
)
|
|
|
|
await notification_service.send_notification(notification)
|
|
|
|
# Acknowledge message
|
|
await message.ack()
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle data import event", error=str(e))
|
|
await message.nack(requeue=False)
|
|
|
|
# ================================================================
|
|
# NOTIFICATION EVENT PUBLISHERS
|
|
# ================================================================
|
|
|
|
async def publish_notification_sent(notification_data: Dict[str, Any]) -> bool:
|
|
"""Publish notification sent event"""
|
|
try:
|
|
if notification_publisher:
|
|
return await notification_publisher.publish_event(
|
|
"notification.events",
|
|
"notification.sent",
|
|
notification_data
|
|
)
|
|
else:
|
|
logger.warning("Notification publisher not initialized")
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Failed to publish notification sent event", error=str(e))
|
|
return False
|
|
|
|
async def publish_notification_failed(notification_data: Dict[str, Any]) -> bool:
|
|
"""Publish notification failed event"""
|
|
try:
|
|
if notification_publisher:
|
|
return await notification_publisher.publish_event(
|
|
"notification.events",
|
|
"notification.failed",
|
|
notification_data
|
|
)
|
|
else:
|
|
logger.warning("Notification publisher not initialized")
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Failed to publish notification failed event", error=str(e))
|
|
return False
|
|
|
|
async def publish_notification_delivered(notification_data: Dict[str, Any]) -> bool:
|
|
"""Publish notification delivered event"""
|
|
try:
|
|
if notification_publisher:
|
|
return await notification_publisher.publish_event(
|
|
"notification.events",
|
|
"notification.delivered",
|
|
notification_data
|
|
)
|
|
else:
|
|
logger.warning("Notification publisher not initialized")
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Failed to publish notification delivered event", error=str(e))
|
|
return False
|
|
|
|
async def publish_bulk_notification_completed(bulk_data: Dict[str, Any]) -> bool:
|
|
"""Publish bulk notification completion event"""
|
|
try:
|
|
if notification_publisher:
|
|
return await notification_publisher.publish_event(
|
|
"notification.events",
|
|
"notification.bulk_completed",
|
|
bulk_data
|
|
)
|
|
else:
|
|
logger.warning("Notification publisher not initialized")
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Failed to publish bulk notification event", error=str(e))
|
|
return False
|
|
|
|
# ================================================================
|
|
# WEBHOOK HANDLERS (for external delivery status updates)
|
|
# ================================================================
|
|
|
|
async def handle_email_delivery_webhook(webhook_data: Dict[str, Any]):
|
|
"""Handle email delivery status webhooks (e.g., from SendGrid, Mailgun)"""
|
|
try:
|
|
notification_id = webhook_data.get("notification_id")
|
|
status = webhook_data.get("status")
|
|
|
|
logger.info("Received email delivery webhook",
|
|
notification_id=notification_id,
|
|
status=status)
|
|
|
|
# Update notification status in database
|
|
from app.services.notification_service import NotificationService
|
|
notification_service = NotificationService()
|
|
|
|
# This would require additional method in NotificationService
|
|
# await notification_service.update_delivery_status(notification_id, status)
|
|
|
|
# Publish delivery event
|
|
await publish_notification_delivered({
|
|
"notification_id": notification_id,
|
|
"status": status,
|
|
"delivery_time": webhook_data.get("timestamp"),
|
|
"provider": webhook_data.get("provider")
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle email delivery webhook", error=str(e))
|
|
|
|
async def handle_whatsapp_delivery_webhook(webhook_data: Dict[str, Any]):
|
|
"""Handle WhatsApp delivery status webhooks (from Twilio)"""
|
|
try:
|
|
message_sid = webhook_data.get("MessageSid")
|
|
status = webhook_data.get("MessageStatus")
|
|
|
|
logger.info("Received WhatsApp delivery webhook",
|
|
message_sid=message_sid,
|
|
status=status)
|
|
|
|
# Map Twilio status to our status
|
|
status_mapping = {
|
|
"sent": "sent",
|
|
"delivered": "delivered",
|
|
"read": "read",
|
|
"failed": "failed",
|
|
"undelivered": "failed"
|
|
}
|
|
|
|
mapped_status = status_mapping.get(status, status)
|
|
|
|
# Publish delivery event
|
|
await publish_notification_delivered({
|
|
"provider_message_id": message_sid,
|
|
"status": mapped_status,
|
|
"delivery_time": webhook_data.get("timestamp"),
|
|
"provider": "twilio"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to handle WhatsApp delivery webhook", error=str(e))
|
|
|
|
# ================================================================
|
|
# SCHEDULED NOTIFICATION PROCESSING
|
|
# ================================================================
|
|
|
|
async def process_scheduled_notifications():
|
|
"""Process scheduled notifications (called by background task)"""
|
|
try:
|
|
from datetime import datetime
|
|
from app.core.database import get_db
|
|
from app.models.notifications import Notification, NotificationStatus
|
|
from app.services.notification_service import NotificationService
|
|
from sqlalchemy import select, and_
|
|
|
|
logger.info("Processing scheduled notifications")
|
|
|
|
async for db in get_db():
|
|
# Get notifications scheduled for now or earlier
|
|
now = datetime.utcnow()
|
|
|
|
result = await db.execute(
|
|
select(Notification).where(
|
|
and_(
|
|
Notification.status == NotificationStatus.PENDING,
|
|
Notification.scheduled_at <= now,
|
|
Notification.scheduled_at.isnot(None)
|
|
)
|
|
).limit(100) # Process in batches
|
|
)
|
|
|
|
scheduled_notifications = result.scalars().all()
|
|
|
|
if not scheduled_notifications:
|
|
return
|
|
|
|
logger.info("Found scheduled notifications to process",
|
|
count=len(scheduled_notifications))
|
|
|
|
notification_service = NotificationService()
|
|
|
|
for notification in scheduled_notifications:
|
|
try:
|
|
# Convert to schema for processing
|
|
from app.schemas.notifications import NotificationCreate, NotificationType
|
|
|
|
notification_create = NotificationCreate(
|
|
type=NotificationType(notification.type.value),
|
|
recipient_id=str(notification.recipient_id) if notification.recipient_id else None,
|
|
recipient_email=notification.recipient_email,
|
|
recipient_phone=notification.recipient_phone,
|
|
subject=notification.subject,
|
|
message=notification.message,
|
|
html_content=notification.html_content,
|
|
template_id=notification.template_id,
|
|
template_data=notification.template_data,
|
|
priority=notification.priority,
|
|
tenant_id=str(notification.tenant_id),
|
|
sender_id=str(notification.sender_id),
|
|
broadcast=notification.broadcast
|
|
)
|
|
|
|
# Process the scheduled notification
|
|
await notification_service.send_notification(notification_create)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to process scheduled notification",
|
|
notification_id=str(notification.id),
|
|
error=str(e))
|
|
|
|
await db.commit()
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to process scheduled notifications", error=str(e)) |