561 lines
21 KiB
Python
561 lines
21 KiB
Python
# ================================================================
|
|
# services/notification/app/services/whatsapp_business_service.py
|
|
# ================================================================
|
|
"""
|
|
WhatsApp Business Cloud API Service
|
|
Direct integration with Meta's WhatsApp Business Cloud API
|
|
Supports template messages for proactive notifications
|
|
"""
|
|
|
|
import structlog
|
|
import httpx
|
|
from typing import Optional, Dict, Any, List
|
|
import asyncio
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
from app.core.config import settings
|
|
from app.schemas.whatsapp import (
|
|
SendWhatsAppMessageRequest,
|
|
SendWhatsAppMessageResponse,
|
|
TemplateComponent,
|
|
WhatsAppMessageStatus,
|
|
WhatsAppMessageType
|
|
)
|
|
from app.repositories.whatsapp_message_repository import (
|
|
WhatsAppMessageRepository,
|
|
WhatsAppTemplateRepository
|
|
)
|
|
from app.models.whatsapp_messages import WhatsAppMessage
|
|
from shared.monitoring.metrics import MetricsCollector
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = structlog.get_logger()
|
|
metrics = MetricsCollector("notification-service")
|
|
|
|
|
|
class WhatsAppBusinessService:
|
|
"""
|
|
WhatsApp Business Cloud API Service
|
|
Direct integration with Meta/Facebook WhatsApp Business Cloud API
|
|
"""
|
|
|
|
def __init__(self, session: Optional[AsyncSession] = None, tenant_client=None):
|
|
# Global configuration (fallback)
|
|
self.global_access_token = settings.WHATSAPP_ACCESS_TOKEN
|
|
self.global_phone_number_id = settings.WHATSAPP_PHONE_NUMBER_ID
|
|
self.global_business_account_id = settings.WHATSAPP_BUSINESS_ACCOUNT_ID
|
|
self.api_version = settings.WHATSAPP_API_VERSION or "v18.0"
|
|
self.base_url = f"https://graph.facebook.com/{self.api_version}"
|
|
self.enabled = settings.ENABLE_WHATSAPP_NOTIFICATIONS
|
|
|
|
# Tenant client for fetching per-tenant settings
|
|
self.tenant_client = tenant_client
|
|
|
|
# Repository dependencies (will be injected)
|
|
self.session = session
|
|
self.message_repo = WhatsAppMessageRepository(session) if session else None
|
|
self.template_repo = WhatsAppTemplateRepository(session) if session else None
|
|
|
|
async def _get_whatsapp_credentials(self, tenant_id: str) -> Dict[str, str]:
|
|
"""
|
|
Get WhatsApp credentials for a tenant (Shared Account Model)
|
|
|
|
Uses global master account credentials with tenant-specific phone number
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
|
|
Returns:
|
|
Dictionary with access_token, phone_number_id, business_account_id
|
|
"""
|
|
# Always use global master account credentials
|
|
access_token = self.global_access_token
|
|
business_account_id = self.global_business_account_id
|
|
phone_number_id = self.global_phone_number_id # Default fallback
|
|
|
|
# Try to fetch tenant-specific phone number
|
|
if self.tenant_client:
|
|
try:
|
|
notification_settings = await self.tenant_client.get_notification_settings(tenant_id)
|
|
|
|
if notification_settings and notification_settings.get('whatsapp_enabled'):
|
|
tenant_phone_id = notification_settings.get('whatsapp_phone_number_id', '').strip()
|
|
|
|
# Use tenant's assigned phone number if configured
|
|
if tenant_phone_id:
|
|
phone_number_id = tenant_phone_id
|
|
logger.info(
|
|
"Using tenant-assigned WhatsApp phone number with shared account",
|
|
tenant_id=tenant_id,
|
|
phone_number_id=phone_number_id
|
|
)
|
|
else:
|
|
logger.info(
|
|
"Tenant WhatsApp enabled but no phone number assigned, using default",
|
|
tenant_id=tenant_id
|
|
)
|
|
else:
|
|
logger.info(
|
|
"Tenant WhatsApp not enabled, using default phone number",
|
|
tenant_id=tenant_id
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to fetch tenant notification settings, using default phone number",
|
|
error=str(e),
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
logger.info(
|
|
"Using shared WhatsApp account",
|
|
tenant_id=tenant_id,
|
|
phone_number_id=phone_number_id
|
|
)
|
|
return {
|
|
'access_token': access_token,
|
|
'phone_number_id': phone_number_id,
|
|
'business_account_id': business_account_id
|
|
}
|
|
|
|
async def send_message(
|
|
self,
|
|
request: SendWhatsAppMessageRequest
|
|
) -> SendWhatsAppMessageResponse:
|
|
"""
|
|
Send WhatsApp message via Cloud API
|
|
|
|
Args:
|
|
request: Message request with all details
|
|
|
|
Returns:
|
|
SendWhatsAppMessageResponse with status
|
|
"""
|
|
try:
|
|
if not self.enabled:
|
|
logger.info("WhatsApp notifications disabled")
|
|
return SendWhatsAppMessageResponse(
|
|
success=False,
|
|
message_id=str(uuid.uuid4()),
|
|
status=WhatsAppMessageStatus.FAILED,
|
|
error_message="WhatsApp notifications are disabled"
|
|
)
|
|
|
|
# Get tenant-specific or global credentials
|
|
credentials = await self._get_whatsapp_credentials(request.tenant_id)
|
|
access_token = credentials['access_token']
|
|
phone_number_id = credentials['phone_number_id']
|
|
|
|
# Validate configuration
|
|
if not access_token or not phone_number_id:
|
|
logger.error("WhatsApp Cloud API not configured properly", tenant_id=request.tenant_id)
|
|
return SendWhatsAppMessageResponse(
|
|
success=False,
|
|
message_id=str(uuid.uuid4()),
|
|
status=WhatsAppMessageStatus.FAILED,
|
|
error_message="WhatsApp Cloud API credentials not configured"
|
|
)
|
|
|
|
# Create message record in database
|
|
message_data = {
|
|
"tenant_id": request.tenant_id,
|
|
"notification_id": request.notification_id,
|
|
"recipient_phone": request.recipient_phone,
|
|
"recipient_name": request.recipient_name,
|
|
"message_type": request.message_type,
|
|
"status": WhatsAppMessageStatus.PENDING,
|
|
"metadata": request.metadata,
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# Add template details if template message
|
|
if request.message_type == WhatsAppMessageType.TEMPLATE and request.template:
|
|
message_data["template_name"] = request.template.template_name
|
|
message_data["template_language"] = request.template.language
|
|
message_data["template_parameters"] = [
|
|
comp.model_dump() for comp in request.template.components
|
|
]
|
|
|
|
# Add text details if text message
|
|
if request.message_type == WhatsAppMessageType.TEXT and request.text:
|
|
message_data["message_body"] = request.text
|
|
|
|
# Save to database
|
|
if self.message_repo:
|
|
db_message = await self.message_repo.create_message(message_data)
|
|
message_id = str(db_message.id)
|
|
else:
|
|
message_id = str(uuid.uuid4())
|
|
|
|
# Send message via Cloud API
|
|
if request.message_type == WhatsAppMessageType.TEMPLATE:
|
|
result = await self._send_template_message(
|
|
recipient_phone=request.recipient_phone,
|
|
template=request.template,
|
|
message_id=message_id,
|
|
access_token=access_token,
|
|
phone_number_id=phone_number_id
|
|
)
|
|
elif request.message_type == WhatsAppMessageType.TEXT:
|
|
result = await self._send_text_message(
|
|
recipient_phone=request.recipient_phone,
|
|
text=request.text,
|
|
message_id=message_id,
|
|
access_token=access_token,
|
|
phone_number_id=phone_number_id
|
|
)
|
|
else:
|
|
logger.error(f"Unsupported message type: {request.message_type}")
|
|
result = {
|
|
"success": False,
|
|
"error_message": f"Unsupported message type: {request.message_type}"
|
|
}
|
|
|
|
# Update database with result
|
|
if self.message_repo and result.get("success"):
|
|
await self.message_repo.update_message_status(
|
|
message_id=message_id,
|
|
status=WhatsAppMessageStatus.SENT,
|
|
whatsapp_message_id=result.get("whatsapp_message_id"),
|
|
provider_response=result.get("provider_response")
|
|
)
|
|
elif self.message_repo:
|
|
await self.message_repo.update_message_status(
|
|
message_id=message_id,
|
|
status=WhatsAppMessageStatus.FAILED,
|
|
error_message=result.get("error_message"),
|
|
provider_response=result.get("provider_response")
|
|
)
|
|
|
|
# Record metrics
|
|
status = "success" if result.get("success") else "failed"
|
|
metrics.increment_counter("whatsapp_sent_total", labels={"status": status})
|
|
|
|
return SendWhatsAppMessageResponse(
|
|
success=result.get("success", False),
|
|
message_id=message_id,
|
|
whatsapp_message_id=result.get("whatsapp_message_id"),
|
|
status=WhatsAppMessageStatus.SENT if result.get("success") else WhatsAppMessageStatus.FAILED,
|
|
error_message=result.get("error_message")
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send WhatsApp message", error=str(e))
|
|
metrics.increment_counter("whatsapp_sent_total", labels={"status": "failed"})
|
|
|
|
return SendWhatsAppMessageResponse(
|
|
success=False,
|
|
message_id=str(uuid.uuid4()),
|
|
status=WhatsAppMessageStatus.FAILED,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _send_template_message(
|
|
self,
|
|
recipient_phone: str,
|
|
template: Any,
|
|
message_id: str,
|
|
access_token: str,
|
|
phone_number_id: str
|
|
) -> Dict[str, Any]:
|
|
"""Send template message via WhatsApp Cloud API"""
|
|
try:
|
|
# Build template payload
|
|
payload = {
|
|
"messaging_product": "whatsapp",
|
|
"to": recipient_phone,
|
|
"type": "template",
|
|
"template": {
|
|
"name": template.template_name,
|
|
"language": {
|
|
"code": template.language
|
|
},
|
|
"components": [
|
|
{
|
|
"type": comp.type,
|
|
"parameters": [
|
|
param.model_dump() for param in (comp.parameters or [])
|
|
]
|
|
}
|
|
for comp in template.components
|
|
]
|
|
}
|
|
}
|
|
|
|
# Send request to WhatsApp Cloud API
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/{phone_number_id}/messages",
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=payload
|
|
)
|
|
|
|
response_data = response.json()
|
|
|
|
if response.status_code == 200:
|
|
whatsapp_message_id = response_data.get("messages", [{}])[0].get("id")
|
|
|
|
logger.info(
|
|
"WhatsApp template message sent successfully",
|
|
message_id=message_id,
|
|
whatsapp_message_id=whatsapp_message_id,
|
|
template=template.template_name,
|
|
recipient=recipient_phone
|
|
)
|
|
|
|
# Increment template usage count
|
|
if self.template_repo:
|
|
template_obj = await self.template_repo.get_by_template_name(
|
|
template.template_name,
|
|
template.language
|
|
)
|
|
if template_obj:
|
|
await self.template_repo.increment_usage(str(template_obj.id))
|
|
|
|
return {
|
|
"success": True,
|
|
"whatsapp_message_id": whatsapp_message_id,
|
|
"provider_response": response_data
|
|
}
|
|
else:
|
|
error_message = response_data.get("error", {}).get("message", "Unknown error")
|
|
error_code = response_data.get("error", {}).get("code")
|
|
|
|
logger.error(
|
|
"WhatsApp Cloud API error",
|
|
status_code=response.status_code,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
template=template.template_name
|
|
)
|
|
|
|
return {
|
|
"success": False,
|
|
"error_message": f"{error_code}: {error_message}",
|
|
"provider_response": response_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to send template message",
|
|
template=template.template_name,
|
|
error=str(e)
|
|
)
|
|
return {
|
|
"success": False,
|
|
"error_message": str(e)
|
|
}
|
|
|
|
async def _send_text_message(
|
|
self,
|
|
recipient_phone: str,
|
|
text: str,
|
|
message_id: str,
|
|
access_token: str,
|
|
phone_number_id: str
|
|
) -> Dict[str, Any]:
|
|
"""Send text message via WhatsApp Cloud API"""
|
|
try:
|
|
payload = {
|
|
"messaging_product": "whatsapp",
|
|
"to": recipient_phone,
|
|
"type": "text",
|
|
"text": {
|
|
"body": text
|
|
}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/{phone_number_id}/messages",
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=payload
|
|
)
|
|
|
|
response_data = response.json()
|
|
|
|
if response.status_code == 200:
|
|
whatsapp_message_id = response_data.get("messages", [{}])[0].get("id")
|
|
|
|
logger.info(
|
|
"WhatsApp text message sent successfully",
|
|
message_id=message_id,
|
|
whatsapp_message_id=whatsapp_message_id,
|
|
recipient=recipient_phone
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"whatsapp_message_id": whatsapp_message_id,
|
|
"provider_response": response_data
|
|
}
|
|
else:
|
|
error_message = response_data.get("error", {}).get("message", "Unknown error")
|
|
error_code = response_data.get("error", {}).get("code")
|
|
|
|
logger.error(
|
|
"WhatsApp Cloud API error",
|
|
status_code=response.status_code,
|
|
error_code=error_code,
|
|
error_message=error_message
|
|
)
|
|
|
|
return {
|
|
"success": False,
|
|
"error_message": f"{error_code}: {error_message}",
|
|
"provider_response": response_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send text message", error=str(e))
|
|
return {
|
|
"success": False,
|
|
"error_message": str(e)
|
|
}
|
|
|
|
async def send_bulk_messages(
|
|
self,
|
|
requests: List[SendWhatsAppMessageRequest],
|
|
batch_size: int = 20
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Send bulk WhatsApp messages with rate limiting
|
|
|
|
Args:
|
|
requests: List of message requests
|
|
batch_size: Number of messages to send per batch
|
|
|
|
Returns:
|
|
Dict containing success/failure counts
|
|
"""
|
|
results = {
|
|
"total": len(requests),
|
|
"sent": 0,
|
|
"failed": 0,
|
|
"errors": []
|
|
}
|
|
|
|
try:
|
|
# Process in batches to respect WhatsApp rate limits
|
|
for i in range(0, len(requests), batch_size):
|
|
batch = requests[i:i + batch_size]
|
|
|
|
# Send messages concurrently within batch
|
|
tasks = [self.send_message(req) for req in batch]
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for req, result in zip(batch, batch_results):
|
|
if isinstance(result, Exception):
|
|
results["failed"] += 1
|
|
results["errors"].append({
|
|
"phone": req.recipient_phone,
|
|
"error": str(result)
|
|
})
|
|
elif result.success:
|
|
results["sent"] += 1
|
|
else:
|
|
results["failed"] += 1
|
|
results["errors"].append({
|
|
"phone": req.recipient_phone,
|
|
"error": result.error_message
|
|
})
|
|
|
|
# Rate limiting delay between batches
|
|
if i + batch_size < len(requests):
|
|
await asyncio.sleep(2.0)
|
|
|
|
logger.info(
|
|
"Bulk WhatsApp completed",
|
|
total=results["total"],
|
|
sent=results["sent"],
|
|
failed=results["failed"]
|
|
)
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error("Bulk WhatsApp failed", error=str(e))
|
|
results["errors"].append({"error": str(e)})
|
|
return results
|
|
|
|
async def health_check(self) -> bool:
|
|
"""
|
|
Check if WhatsApp Cloud API is healthy
|
|
|
|
Returns:
|
|
bool: True if service is healthy
|
|
"""
|
|
try:
|
|
if not self.enabled:
|
|
return True # Service is "healthy" if disabled
|
|
|
|
if not self.global_access_token or not self.global_phone_number_id:
|
|
logger.warning("WhatsApp Cloud API not configured")
|
|
return False
|
|
|
|
# Test API connectivity
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/{self.global_phone_number_id}",
|
|
headers={
|
|
"Authorization": f"Bearer {self.global_access_token}"
|
|
},
|
|
params={
|
|
"fields": "verified_name,code_verification_status"
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("WhatsApp Cloud API health check passed")
|
|
return True
|
|
else:
|
|
logger.error(
|
|
"WhatsApp Cloud API health check failed",
|
|
status_code=response.status_code
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error("WhatsApp Cloud API health check failed", error=str(e))
|
|
return False
|
|
|
|
def _format_phone_number(self, phone: str) -> Optional[str]:
|
|
"""
|
|
Format phone number for WhatsApp (E.164 format)
|
|
|
|
Args:
|
|
phone: Input phone number
|
|
|
|
Returns:
|
|
Formatted phone number or None if invalid
|
|
"""
|
|
if not phone:
|
|
return None
|
|
|
|
# If already in E.164 format, return as is
|
|
if phone.startswith('+'):
|
|
return phone
|
|
|
|
# Remove spaces, dashes, and other non-digit characters
|
|
clean_phone = "".join(filter(str.isdigit, phone))
|
|
|
|
# Handle Spanish phone numbers
|
|
if clean_phone.startswith("34"):
|
|
return f"+{clean_phone}"
|
|
elif clean_phone.startswith(("6", "7", "9")) and len(clean_phone) == 9:
|
|
return f"+34{clean_phone}"
|
|
else:
|
|
# Try to add + if it looks like a complete international number
|
|
if len(clean_phone) > 10:
|
|
return f"+{clean_phone}"
|
|
|
|
logger.warning("Unrecognized phone format", phone=phone)
|
|
return None
|