301 lines
9.7 KiB
Python
301 lines
9.7 KiB
Python
|
|
# ================================================================
|
||
|
|
# services/notification/app/api/whatsapp_webhooks.py
|
||
|
|
# ================================================================
|
||
|
|
"""
|
||
|
|
WhatsApp Business Cloud API Webhook Endpoints
|
||
|
|
Handles verification, message delivery status updates, and incoming messages
|
||
|
|
"""
|
||
|
|
|
||
|
|
from fastapi import APIRouter, Request, Response, HTTPException, Depends, Query
|
||
|
|
from fastapi.responses import PlainTextResponse
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
import structlog
|
||
|
|
from typing import Dict, Any
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
from app.core.config import settings
|
||
|
|
from app.repositories.whatsapp_message_repository import WhatsAppMessageRepository
|
||
|
|
from app.models.whatsapp_messages import WhatsAppMessageStatus
|
||
|
|
from app.core.database import get_db
|
||
|
|
from shared.monitoring.metrics import MetricsCollector
|
||
|
|
|
||
|
|
logger = structlog.get_logger()
|
||
|
|
metrics = MetricsCollector("notification-service")
|
||
|
|
|
||
|
|
router = APIRouter(prefix="/api/v1/whatsapp", tags=["whatsapp-webhooks"])
|
||
|
|
|
||
|
|
|
||
|
|
@router.get("/webhook")
|
||
|
|
async def verify_webhook(
|
||
|
|
request: Request,
|
||
|
|
hub_mode: str = Query(None, alias="hub.mode"),
|
||
|
|
hub_token: str = Query(None, alias="hub.verify_token"),
|
||
|
|
hub_challenge: str = Query(None, alias="hub.challenge")
|
||
|
|
) -> PlainTextResponse:
|
||
|
|
"""
|
||
|
|
Webhook verification endpoint for WhatsApp Cloud API
|
||
|
|
|
||
|
|
Meta sends a GET request with hub.mode, hub.verify_token, and hub.challenge
|
||
|
|
to verify the webhook URL when you configure it in the Meta Business Suite.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
hub_mode: Should be "subscribe"
|
||
|
|
hub_token: Verify token configured in settings
|
||
|
|
hub_challenge: Challenge string to echo back
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
PlainTextResponse with challenge if verification succeeds
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
logger.info(
|
||
|
|
"WhatsApp webhook verification request received",
|
||
|
|
mode=hub_mode,
|
||
|
|
token_provided=bool(hub_token),
|
||
|
|
challenge_provided=bool(hub_challenge)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Verify the mode and token
|
||
|
|
if hub_mode == "subscribe" and hub_token == settings.WHATSAPP_WEBHOOK_VERIFY_TOKEN:
|
||
|
|
logger.info("WhatsApp webhook verification successful")
|
||
|
|
|
||
|
|
# Respond with the challenge token
|
||
|
|
return PlainTextResponse(content=hub_challenge, status_code=200)
|
||
|
|
else:
|
||
|
|
logger.warning(
|
||
|
|
"WhatsApp webhook verification failed",
|
||
|
|
mode=hub_mode,
|
||
|
|
token_match=hub_token == settings.WHATSAPP_WEBHOOK_VERIFY_TOKEN
|
||
|
|
)
|
||
|
|
raise HTTPException(status_code=403, detail="Verification token mismatch")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("WhatsApp webhook verification error", error=str(e))
|
||
|
|
raise HTTPException(status_code=500, detail="Verification failed")
|
||
|
|
|
||
|
|
|
||
|
|
@router.post("/webhook")
|
||
|
|
async def handle_webhook(
|
||
|
|
request: Request,
|
||
|
|
session: AsyncSession = Depends(get_db)
|
||
|
|
) -> Dict[str, str]:
|
||
|
|
"""
|
||
|
|
Webhook endpoint for WhatsApp Cloud API events
|
||
|
|
|
||
|
|
Receives notifications about:
|
||
|
|
- Message delivery status (sent, delivered, read, failed)
|
||
|
|
- Incoming messages from users
|
||
|
|
- Errors and other events
|
||
|
|
|
||
|
|
Args:
|
||
|
|
request: FastAPI request with webhook payload
|
||
|
|
session: Database session
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Success response
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Parse webhook payload
|
||
|
|
payload = await request.json()
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"WhatsApp webhook received",
|
||
|
|
object_type=payload.get("object"),
|
||
|
|
entries_count=len(payload.get("entry", []))
|
||
|
|
)
|
||
|
|
|
||
|
|
# Verify it's a WhatsApp webhook
|
||
|
|
if payload.get("object") != "whatsapp_business_account":
|
||
|
|
logger.warning("Unknown webhook object type", object_type=payload.get("object"))
|
||
|
|
return {"status": "ignored"}
|
||
|
|
|
||
|
|
# Process each entry
|
||
|
|
for entry in payload.get("entry", []):
|
||
|
|
entry_id = entry.get("id")
|
||
|
|
|
||
|
|
for change in entry.get("changes", []):
|
||
|
|
field = change.get("field")
|
||
|
|
value = change.get("value", {})
|
||
|
|
|
||
|
|
if field == "messages":
|
||
|
|
# Handle incoming messages or status updates
|
||
|
|
await _handle_message_change(value, session)
|
||
|
|
else:
|
||
|
|
logger.debug("Unhandled webhook field", field=field)
|
||
|
|
|
||
|
|
# Record metric
|
||
|
|
metrics.increment_counter("whatsapp_webhooks_received")
|
||
|
|
|
||
|
|
# Always return 200 OK to acknowledge receipt
|
||
|
|
return {"status": "success"}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("WhatsApp webhook processing error", error=str(e))
|
||
|
|
# Still return 200 to avoid Meta retrying
|
||
|
|
return {"status": "error", "message": str(e)}
|
||
|
|
|
||
|
|
|
||
|
|
async def _handle_message_change(value: Dict[str, Any], session: AsyncSession) -> None:
|
||
|
|
"""
|
||
|
|
Handle message-related webhook events
|
||
|
|
|
||
|
|
Args:
|
||
|
|
value: Webhook value containing message data
|
||
|
|
session: Database session
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
messaging_product = value.get("messaging_product")
|
||
|
|
metadata = value.get("metadata", {})
|
||
|
|
|
||
|
|
# Handle status updates
|
||
|
|
statuses = value.get("statuses", [])
|
||
|
|
if statuses:
|
||
|
|
await _handle_status_updates(statuses, session)
|
||
|
|
|
||
|
|
# Handle incoming messages
|
||
|
|
messages = value.get("messages", [])
|
||
|
|
if messages:
|
||
|
|
await _handle_incoming_messages(messages, metadata, session)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Error handling message change", error=str(e))
|
||
|
|
|
||
|
|
|
||
|
|
async def _handle_status_updates(
|
||
|
|
statuses: list,
|
||
|
|
session: AsyncSession
|
||
|
|
) -> None:
|
||
|
|
"""
|
||
|
|
Handle message delivery status updates
|
||
|
|
|
||
|
|
Args:
|
||
|
|
statuses: List of status update objects
|
||
|
|
session: Database session
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
message_repo = WhatsAppMessageRepository(session)
|
||
|
|
|
||
|
|
for status in statuses:
|
||
|
|
whatsapp_message_id = status.get("id")
|
||
|
|
status_value = status.get("status") # sent, delivered, read, failed
|
||
|
|
timestamp = status.get("timestamp")
|
||
|
|
errors = status.get("errors", [])
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"WhatsApp message status update",
|
||
|
|
message_id=whatsapp_message_id,
|
||
|
|
status=status_value,
|
||
|
|
timestamp=timestamp
|
||
|
|
)
|
||
|
|
|
||
|
|
# Find message in database
|
||
|
|
db_message = await message_repo.get_by_whatsapp_id(whatsapp_message_id)
|
||
|
|
|
||
|
|
if not db_message:
|
||
|
|
logger.warning(
|
||
|
|
"Received status for unknown message",
|
||
|
|
whatsapp_message_id=whatsapp_message_id
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Map WhatsApp status to our enum
|
||
|
|
status_mapping = {
|
||
|
|
"sent": WhatsAppMessageStatus.SENT,
|
||
|
|
"delivered": WhatsAppMessageStatus.DELIVERED,
|
||
|
|
"read": WhatsAppMessageStatus.READ,
|
||
|
|
"failed": WhatsAppMessageStatus.FAILED
|
||
|
|
}
|
||
|
|
|
||
|
|
new_status = status_mapping.get(status_value)
|
||
|
|
if not new_status:
|
||
|
|
logger.warning("Unknown status value", status=status_value)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Extract error information if failed
|
||
|
|
error_message = None
|
||
|
|
error_code = None
|
||
|
|
if errors:
|
||
|
|
error = errors[0]
|
||
|
|
error_code = error.get("code")
|
||
|
|
error_message = error.get("title", error.get("message"))
|
||
|
|
|
||
|
|
# Update message status
|
||
|
|
await message_repo.update_message_status(
|
||
|
|
message_id=str(db_message.id),
|
||
|
|
status=new_status,
|
||
|
|
error_message=error_message,
|
||
|
|
provider_response=status
|
||
|
|
)
|
||
|
|
|
||
|
|
# Record metric
|
||
|
|
metrics.increment_counter(
|
||
|
|
"whatsapp_status_updates",
|
||
|
|
labels={"status": status_value}
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Error handling status updates", error=str(e))
|
||
|
|
|
||
|
|
|
||
|
|
async def _handle_incoming_messages(
|
||
|
|
messages: list,
|
||
|
|
metadata: Dict[str, Any],
|
||
|
|
session: AsyncSession
|
||
|
|
) -> None:
|
||
|
|
"""
|
||
|
|
Handle incoming messages from users
|
||
|
|
|
||
|
|
This is for future use if you want to implement two-way messaging.
|
||
|
|
For now, we just log incoming messages.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
messages: List of message objects
|
||
|
|
metadata: Metadata about the phone number
|
||
|
|
session: Database session
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
for message in messages:
|
||
|
|
message_id = message.get("id")
|
||
|
|
from_number = message.get("from")
|
||
|
|
message_type = message.get("type")
|
||
|
|
timestamp = message.get("timestamp")
|
||
|
|
|
||
|
|
# Extract message content based on type
|
||
|
|
content = None
|
||
|
|
if message_type == "text":
|
||
|
|
content = message.get("text", {}).get("body")
|
||
|
|
elif message_type == "image":
|
||
|
|
content = message.get("image", {}).get("caption")
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"Incoming WhatsApp message",
|
||
|
|
message_id=message_id,
|
||
|
|
from_number=from_number,
|
||
|
|
message_type=message_type,
|
||
|
|
content=content[:100] if content else None
|
||
|
|
)
|
||
|
|
|
||
|
|
# Record metric
|
||
|
|
metrics.increment_counter(
|
||
|
|
"whatsapp_incoming_messages",
|
||
|
|
labels={"type": message_type}
|
||
|
|
)
|
||
|
|
|
||
|
|
# TODO: Implement incoming message handling logic
|
||
|
|
# For example:
|
||
|
|
# - Create a new conversation session
|
||
|
|
# - Route to customer support
|
||
|
|
# - Auto-reply with acknowledgment
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Error handling incoming messages", error=str(e))
|
||
|
|
|
||
|
|
|
||
|
|
@router.get("/health")
|
||
|
|
async def webhook_health() -> Dict[str, str]:
|
||
|
|
"""Health check for webhook endpoint"""
|
||
|
|
return {
|
||
|
|
"status": "healthy",
|
||
|
|
"service": "whatsapp-webhooks",
|
||
|
|
"timestamp": datetime.utcnow().isoformat()
|
||
|
|
}
|