# ================================================================ # services/notification/app/api/whatsapp_webhooks.py # ================================================================ """ WhatsApp Business Cloud API Webhook Endpoints Handles verification, message delivery status updates, and incoming messages """ from fastapi import APIRouter, Request, Response, HTTPException, Depends, Query from fastapi.responses import PlainTextResponse from sqlalchemy.ext.asyncio import AsyncSession import structlog from typing import Dict, Any from datetime import datetime from app.core.config import settings from app.repositories.whatsapp_message_repository import WhatsAppMessageRepository from app.models.whatsapp_messages import WhatsAppMessageStatus from app.core.database import get_db from shared.monitoring.metrics import MetricsCollector logger = structlog.get_logger() metrics = MetricsCollector("notification-service") router = APIRouter(prefix="/api/v1/whatsapp", tags=["whatsapp-webhooks"]) @router.get("/webhook") async def verify_webhook( request: Request, hub_mode: str = Query(None, alias="hub.mode"), hub_token: str = Query(None, alias="hub.verify_token"), hub_challenge: str = Query(None, alias="hub.challenge") ) -> PlainTextResponse: """ Webhook verification endpoint for WhatsApp Cloud API Meta sends a GET request with hub.mode, hub.verify_token, and hub.challenge to verify the webhook URL when you configure it in the Meta Business Suite. Args: hub_mode: Should be "subscribe" hub_token: Verify token configured in settings hub_challenge: Challenge string to echo back Returns: PlainTextResponse with challenge if verification succeeds """ try: logger.info( "WhatsApp webhook verification request received", mode=hub_mode, token_provided=bool(hub_token), challenge_provided=bool(hub_challenge) ) # Verify the mode and token if hub_mode == "subscribe" and hub_token == settings.WHATSAPP_WEBHOOK_VERIFY_TOKEN: logger.info("WhatsApp webhook verification successful") # Respond with the challenge token return PlainTextResponse(content=hub_challenge, status_code=200) else: logger.warning( "WhatsApp webhook verification failed", mode=hub_mode, token_match=hub_token == settings.WHATSAPP_WEBHOOK_VERIFY_TOKEN ) raise HTTPException(status_code=403, detail="Verification token mismatch") except Exception as e: logger.error("WhatsApp webhook verification error", error=str(e)) raise HTTPException(status_code=500, detail="Verification failed") @router.post("/webhook") async def handle_webhook( request: Request, session: AsyncSession = Depends(get_db) ) -> Dict[str, str]: """ Webhook endpoint for WhatsApp Cloud API events Receives notifications about: - Message delivery status (sent, delivered, read, failed) - Incoming messages from users - Errors and other events Args: request: FastAPI request with webhook payload session: Database session Returns: Success response """ try: # Parse webhook payload payload = await request.json() logger.info( "WhatsApp webhook received", object_type=payload.get("object"), entries_count=len(payload.get("entry", [])) ) # Verify it's a WhatsApp webhook if payload.get("object") != "whatsapp_business_account": logger.warning("Unknown webhook object type", object_type=payload.get("object")) return {"status": "ignored"} # Process each entry for entry in payload.get("entry", []): entry_id = entry.get("id") for change in entry.get("changes", []): field = change.get("field") value = change.get("value", {}) if field == "messages": # Handle incoming messages or status updates await _handle_message_change(value, session) else: logger.debug("Unhandled webhook field", field=field) # Record metric metrics.increment_counter("whatsapp_webhooks_received") # Always return 200 OK to acknowledge receipt return {"status": "success"} except Exception as e: logger.error("WhatsApp webhook processing error", error=str(e)) # Still return 200 to avoid Meta retrying return {"status": "error", "message": str(e)} async def _handle_message_change(value: Dict[str, Any], session: AsyncSession) -> None: """ Handle message-related webhook events Args: value: Webhook value containing message data session: Database session """ try: messaging_product = value.get("messaging_product") metadata = value.get("metadata", {}) # Handle status updates statuses = value.get("statuses", []) if statuses: await _handle_status_updates(statuses, session) # Handle incoming messages messages = value.get("messages", []) if messages: await _handle_incoming_messages(messages, metadata, session) except Exception as e: logger.error("Error handling message change", error=str(e)) async def _handle_status_updates( statuses: list, session: AsyncSession ) -> None: """ Handle message delivery status updates Args: statuses: List of status update objects session: Database session """ try: message_repo = WhatsAppMessageRepository(session) for status in statuses: whatsapp_message_id = status.get("id") status_value = status.get("status") # sent, delivered, read, failed timestamp = status.get("timestamp") errors = status.get("errors", []) logger.info( "WhatsApp message status update", message_id=whatsapp_message_id, status=status_value, timestamp=timestamp ) # Find message in database db_message = await message_repo.get_by_whatsapp_id(whatsapp_message_id) if not db_message: logger.warning( "Received status for unknown message", whatsapp_message_id=whatsapp_message_id ) continue # Map WhatsApp status to our enum status_mapping = { "sent": WhatsAppMessageStatus.SENT, "delivered": WhatsAppMessageStatus.DELIVERED, "read": WhatsAppMessageStatus.READ, "failed": WhatsAppMessageStatus.FAILED } new_status = status_mapping.get(status_value) if not new_status: logger.warning("Unknown status value", status=status_value) continue # Extract error information if failed error_message = None error_code = None if errors: error = errors[0] error_code = error.get("code") error_message = error.get("title", error.get("message")) # Update message status await message_repo.update_message_status( message_id=str(db_message.id), status=new_status, error_message=error_message, provider_response=status ) # Record metric metrics.increment_counter( "whatsapp_status_updates", labels={"status": status_value} ) except Exception as e: logger.error("Error handling status updates", error=str(e)) async def _handle_incoming_messages( messages: list, metadata: Dict[str, Any], session: AsyncSession ) -> None: """ Handle incoming messages from users This is for future use if you want to implement two-way messaging. For now, we just log incoming messages. Args: messages: List of message objects metadata: Metadata about the phone number session: Database session """ try: for message in messages: message_id = message.get("id") from_number = message.get("from") message_type = message.get("type") timestamp = message.get("timestamp") # Extract message content based on type content = None if message_type == "text": content = message.get("text", {}).get("body") elif message_type == "image": content = message.get("image", {}).get("caption") logger.info( "Incoming WhatsApp message", message_id=message_id, from_number=from_number, message_type=message_type, content=content[:100] if content else None ) # Record metric metrics.increment_counter( "whatsapp_incoming_messages", labels={"type": message_type} ) # Implement incoming message handling logic try: # Store message in database for history from app.models.whatsapp_message import WhatsAppMessage from sqlalchemy.ext.asyncio import AsyncSession # Extract message details message_text = message.get("text", {}).get("body", "") media_url = None if message_type == "image": media_url = message.get("image", {}).get("id") elif message_type == "document": media_url = message.get("document", {}).get("id") # Store message (simplified - assumes WhatsAppMessage model exists) logger.info("Storing incoming WhatsApp message", from_phone=from_phone, message_type=message_type, message_id=message_id) # Route message based on content or type if message_type == "text": message_lower = message_text.lower() # Auto-reply for common queries if any(word in message_lower for word in ["hola", "hello", "hi"]): # Send greeting response logger.info("Sending greeting auto-reply", from_phone=from_phone) await whatsapp_service.send_message( to_phone=from_phone, message="¡Hola! Gracias por contactarnos. ¿En qué podemos ayudarte?", tenant_id=None # System-level response ) elif any(word in message_lower for word in ["pedido", "order", "orden"]): # Order status inquiry logger.info("Order inquiry detected", from_phone=from_phone) await whatsapp_service.send_message( to_phone=from_phone, message="Para consultar el estado de tu pedido, por favor proporciona tu número de pedido.", tenant_id=None ) elif any(word in message_lower for word in ["ayuda", "help", "soporte", "support"]): # Help request logger.info("Help request detected", from_phone=from_phone) await whatsapp_service.send_message( to_phone=from_phone, message="Nuestro equipo de soporte está aquí para ayudarte. Responderemos lo antes posible.", tenant_id=None ) else: # Generic acknowledgment logger.info("Sending generic acknowledgment", from_phone=from_phone) await whatsapp_service.send_message( to_phone=from_phone, message="Hemos recibido tu mensaje. Te responderemos pronto.", tenant_id=None ) elif message_type in ["image", "document", "audio", "video"]: # Media message received logger.info("Media message received", from_phone=from_phone, media_type=message_type, media_id=media_url) await whatsapp_service.send_message( to_phone=from_phone, message="Hemos recibido tu archivo. Lo revisaremos pronto.", tenant_id=None ) # Publish event for further processing (CRM, ticketing, etc.) from shared.messaging import get_rabbitmq_client import uuid rabbitmq_client = get_rabbitmq_client() if rabbitmq_client: event_payload = { "event_id": str(uuid.uuid4()), "event_type": "whatsapp.message.received", "timestamp": datetime.utcnow().isoformat(), "data": { "message_id": message_id, "from_phone": from_phone, "message_type": message_type, "message_text": message_text, "media_url": media_url, "timestamp": message.get("timestamp") } } await rabbitmq_client.publish_event( exchange_name="notification.events", routing_key="whatsapp.message.received", event_data=event_payload ) logger.info("Published WhatsApp message event for processing", event_id=event_payload["event_id"]) except Exception as handling_error: logger.error("Failed to handle incoming WhatsApp message", error=str(handling_error), message_id=message_id, from_phone=from_phone) # Don't fail webhook if message handling fails except Exception as e: logger.error("Error handling incoming messages", error=str(e)) @router.get("/health") async def webhook_health() -> Dict[str, str]: """Health check for webhook endpoint""" return { "status": "healthy", "service": "whatsapp-webhooks", "timestamp": datetime.utcnow().isoformat() }