Files
bakery-ia/services/notification/app/repositories/whatsapp_message_repository.py
2025-11-13 16:01:08 +01:00

380 lines
12 KiB
Python

"""
WhatsApp Message Repository
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text, select, and_
from datetime import datetime, timedelta
import structlog
from app.repositories.base import NotificationBaseRepository
from app.models.whatsapp_messages import WhatsAppMessage, WhatsAppMessageStatus, WhatsAppTemplate
from shared.database.exceptions import DatabaseError
logger = structlog.get_logger()
class WhatsAppMessageRepository(NotificationBaseRepository):
"""Repository for WhatsApp message operations"""
def __init__(self, session: AsyncSession):
super().__init__(WhatsAppMessage, session, cache_ttl=60) # 1 minute cache
async def create_message(self, message_data: Dict[str, Any]) -> WhatsAppMessage:
"""Create a new WhatsApp message record"""
try:
# Validate required fields
validation = self._validate_notification_data(
message_data,
["tenant_id", "recipient_phone", "message_type"]
)
if not validation["is_valid"]:
raise DatabaseError(f"Validation failed: {', '.join(validation['errors'])}")
message = await self.create(message_data)
logger.info(
"WhatsApp message created",
message_id=str(message.id),
recipient=message.recipient_phone,
message_type=message.message_type.value
)
return message
except Exception as e:
logger.error("Failed to create WhatsApp message", error=str(e))
raise DatabaseError(f"Failed to create message: {str(e)}")
async def update_message_status(
self,
message_id: str,
status: WhatsAppMessageStatus,
whatsapp_message_id: Optional[str] = None,
error_message: Optional[str] = None,
provider_response: Optional[Dict] = None
) -> Optional[WhatsAppMessage]:
"""Update message status and related fields"""
try:
update_data = {
"status": status,
"updated_at": datetime.utcnow()
}
# Update timestamps based on status
if status == WhatsAppMessageStatus.SENT:
update_data["sent_at"] = datetime.utcnow()
elif status == WhatsAppMessageStatus.DELIVERED:
update_data["delivered_at"] = datetime.utcnow()
elif status == WhatsAppMessageStatus.READ:
update_data["read_at"] = datetime.utcnow()
elif status == WhatsAppMessageStatus.FAILED:
update_data["failed_at"] = datetime.utcnow()
if whatsapp_message_id:
update_data["whatsapp_message_id"] = whatsapp_message_id
if error_message:
update_data["error_message"] = error_message
if provider_response:
update_data["provider_response"] = provider_response
message = await self.update(message_id, update_data)
logger.info(
"WhatsApp message status updated",
message_id=message_id,
status=status.value,
whatsapp_message_id=whatsapp_message_id
)
return message
except Exception as e:
logger.error(
"Failed to update message status",
message_id=message_id,
error=str(e)
)
return None
async def get_by_whatsapp_id(self, whatsapp_message_id: str) -> Optional[WhatsAppMessage]:
"""Get message by WhatsApp's message ID"""
try:
messages = await self.get_multi(
filters={"whatsapp_message_id": whatsapp_message_id},
limit=1
)
return messages[0] if messages else None
except Exception as e:
logger.error(
"Failed to get message by WhatsApp ID",
whatsapp_message_id=whatsapp_message_id,
error=str(e)
)
return None
async def get_by_notification_id(self, notification_id: str) -> Optional[WhatsAppMessage]:
"""Get message by notification ID"""
try:
messages = await self.get_multi(
filters={"notification_id": notification_id},
limit=1
)
return messages[0] if messages else None
except Exception as e:
logger.error(
"Failed to get message by notification ID",
notification_id=notification_id,
error=str(e)
)
return None
async def get_messages_by_phone(
self,
tenant_id: str,
phone: str,
skip: int = 0,
limit: int = 50
) -> List[WhatsAppMessage]:
"""Get all messages for a specific phone number"""
try:
return await self.get_multi(
filters={"tenant_id": tenant_id, "recipient_phone": phone},
skip=skip,
limit=limit,
order_by="created_at",
order_desc=True
)
except Exception as e:
logger.error(
"Failed to get messages by phone",
phone=phone,
error=str(e)
)
return []
async def get_pending_messages(
self,
tenant_id: str,
limit: int = 100
) -> List[WhatsAppMessage]:
"""Get pending messages for retry processing"""
try:
return await self.get_multi(
filters={
"tenant_id": tenant_id,
"status": WhatsAppMessageStatus.PENDING
},
limit=limit,
order_by="created_at",
order_desc=False # Oldest first
)
except Exception as e:
logger.error("Failed to get pending messages", error=str(e))
return []
async def get_conversation_messages(
self,
conversation_id: str,
skip: int = 0,
limit: int = 50
) -> List[WhatsAppMessage]:
"""Get all messages in a conversation"""
try:
return await self.get_multi(
filters={"conversation_id": conversation_id},
skip=skip,
limit=limit,
order_by="created_at",
order_desc=False # Chronological order
)
except Exception as e:
logger.error(
"Failed to get conversation messages",
conversation_id=conversation_id,
error=str(e)
)
return []
async def get_delivery_stats(
self,
tenant_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get delivery statistics for WhatsApp messages"""
try:
# Default to last 30 days
if not start_date:
start_date = datetime.utcnow() - timedelta(days=30)
if not end_date:
end_date = datetime.utcnow()
query = text("""
SELECT
COUNT(*) as total_messages,
COUNT(CASE WHEN status = 'SENT' THEN 1 END) as sent,
COUNT(CASE WHEN status = 'DELIVERED' THEN 1 END) as delivered,
COUNT(CASE WHEN status = 'READ' THEN 1 END) as read,
COUNT(CASE WHEN status = 'FAILED' THEN 1 END) as failed,
COUNT(CASE WHEN status = 'PENDING' THEN 1 END) as pending,
COUNT(DISTINCT recipient_phone) as unique_recipients,
COUNT(DISTINCT conversation_id) as total_conversations
FROM whatsapp_messages
WHERE tenant_id = :tenant_id
AND created_at BETWEEN :start_date AND :end_date
""")
result = await self.session.execute(
query,
{
"tenant_id": tenant_id,
"start_date": start_date,
"end_date": end_date
}
)
row = result.fetchone()
if row:
total = row.total_messages or 0
delivered = row.delivered or 0
return {
"total_messages": total,
"sent": row.sent or 0,
"delivered": delivered,
"read": row.read or 0,
"failed": row.failed or 0,
"pending": row.pending or 0,
"unique_recipients": row.unique_recipients or 0,
"total_conversations": row.total_conversations or 0,
"delivery_rate": round((delivered / total * 100), 2) if total > 0 else 0,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
}
}
return {
"total_messages": 0,
"sent": 0,
"delivered": 0,
"read": 0,
"failed": 0,
"pending": 0,
"unique_recipients": 0,
"total_conversations": 0,
"delivery_rate": 0,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
}
}
except Exception as e:
logger.error("Failed to get delivery stats", error=str(e))
return {}
class WhatsAppTemplateRepository(NotificationBaseRepository):
"""Repository for WhatsApp template operations"""
def __init__(self, session: AsyncSession):
super().__init__(WhatsAppTemplate, session, cache_ttl=300) # 5 minute cache
async def get_by_template_name(
self,
template_name: str,
language: str = "es"
) -> Optional[WhatsAppTemplate]:
"""Get template by name and language"""
try:
templates = await self.get_multi(
filters={
"template_name": template_name,
"language": language,
"is_active": True
},
limit=1
)
return templates[0] if templates else None
except Exception as e:
logger.error(
"Failed to get template by name",
template_name=template_name,
error=str(e)
)
return None
async def get_by_template_key(self, template_key: str) -> Optional[WhatsAppTemplate]:
"""Get template by internal key"""
try:
templates = await self.get_multi(
filters={"template_key": template_key},
limit=1
)
return templates[0] if templates else None
except Exception as e:
logger.error(
"Failed to get template by key",
template_key=template_key,
error=str(e)
)
return None
async def get_active_templates(
self,
tenant_id: Optional[str] = None,
category: Optional[str] = None
) -> List[WhatsAppTemplate]:
"""Get all active templates"""
try:
filters = {"is_active": True, "status": "APPROVED"}
if tenant_id:
filters["tenant_id"] = tenant_id
if category:
filters["category"] = category
return await self.get_multi(
filters=filters,
limit=1000,
order_by="created_at",
order_desc=True
)
except Exception as e:
logger.error("Failed to get active templates", error=str(e))
return []
async def increment_usage(self, template_id: str) -> None:
"""Increment template usage counter"""
try:
template = await self.get(template_id)
if template:
await self.update(
template_id,
{
"sent_count": (template.sent_count or 0) + 1,
"last_used_at": datetime.utcnow()
}
)
except Exception as e:
logger.error(
"Failed to increment template usage",
template_id=template_id,
error=str(e)
)