515 lines
20 KiB
Python
515 lines
20 KiB
Python
"""
|
|
Notification Repository
|
|
Repository for notification operations
|
|
"""
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, text, and_, or_
|
|
from datetime import datetime, timedelta
|
|
import structlog
|
|
import json
|
|
|
|
from .base import NotificationBaseRepository
|
|
from app.models.notifications import Notification, NotificationStatus, NotificationType, NotificationPriority
|
|
from shared.database.exceptions import DatabaseError, ValidationError, DuplicateRecordError
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class NotificationRepository(NotificationBaseRepository):
|
|
"""Repository for notification operations"""
|
|
|
|
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 300):
|
|
# Notifications are very dynamic, short cache time (5 minutes)
|
|
super().__init__(Notification, session, cache_ttl)
|
|
|
|
async def create_notification(self, notification_data: Dict[str, Any]) -> Notification:
|
|
"""Create a new notification with validation"""
|
|
try:
|
|
# Validate notification data
|
|
validation_result = self._validate_notification_data(
|
|
notification_data,
|
|
["tenant_id", "sender_id", "type", "message"]
|
|
)
|
|
|
|
if not validation_result["is_valid"]:
|
|
raise ValidationError(f"Invalid notification data: {validation_result['errors']}")
|
|
|
|
# Set default values
|
|
if "status" not in notification_data:
|
|
notification_data["status"] = NotificationStatus.PENDING
|
|
if "priority" not in notification_data:
|
|
notification_data["priority"] = NotificationPriority.NORMAL
|
|
if "retry_count" not in notification_data:
|
|
notification_data["retry_count"] = 0
|
|
if "max_retries" not in notification_data:
|
|
notification_data["max_retries"] = 3
|
|
if "broadcast" not in notification_data:
|
|
notification_data["broadcast"] = False
|
|
if "read" not in notification_data:
|
|
notification_data["read"] = False
|
|
|
|
# Create notification
|
|
notification = await self.create(notification_data)
|
|
|
|
logger.info("Notification created successfully",
|
|
notification_id=notification.id,
|
|
tenant_id=notification.tenant_id,
|
|
type=notification.type.value,
|
|
recipient_id=notification.recipient_id,
|
|
priority=notification.priority.value)
|
|
|
|
return notification
|
|
|
|
except ValidationError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to create notification",
|
|
tenant_id=notification_data.get("tenant_id"),
|
|
type=notification_data.get("type"),
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to create notification: {str(e)}")
|
|
|
|
async def get_pending_notifications(self, limit: int = 100) -> List[Notification]:
|
|
"""Get pending notifications ready for processing"""
|
|
try:
|
|
# Get notifications that are pending and either not scheduled or scheduled for now/past
|
|
now = datetime.utcnow()
|
|
|
|
query_text = """
|
|
SELECT * FROM notifications
|
|
WHERE status = 'pending'
|
|
AND (scheduled_at IS NULL OR scheduled_at <= :now)
|
|
AND retry_count < max_retries
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT :limit
|
|
"""
|
|
|
|
result = await self.session.execute(text(query_text), {
|
|
"now": now,
|
|
"limit": limit
|
|
})
|
|
|
|
notifications = []
|
|
for row in result.fetchall():
|
|
record_dict = dict(row._mapping)
|
|
# Convert enum strings back to enum objects
|
|
record_dict["status"] = NotificationStatus(record_dict["status"])
|
|
record_dict["type"] = NotificationType(record_dict["type"])
|
|
record_dict["priority"] = NotificationPriority(record_dict["priority"])
|
|
notification = self.model(**record_dict)
|
|
notifications.append(notification)
|
|
|
|
return notifications
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get pending notifications", error=str(e))
|
|
return []
|
|
|
|
async def get_notifications_by_recipient(
|
|
self,
|
|
recipient_id: str,
|
|
tenant_id: str = None,
|
|
status: NotificationStatus = None,
|
|
notification_type: NotificationType = None,
|
|
unread_only: bool = False,
|
|
skip: int = 0,
|
|
limit: int = 50
|
|
) -> List[Notification]:
|
|
"""Get notifications for a specific recipient with filters"""
|
|
try:
|
|
filters = {"recipient_id": recipient_id}
|
|
|
|
if tenant_id:
|
|
filters["tenant_id"] = tenant_id
|
|
|
|
if status:
|
|
filters["status"] = status
|
|
|
|
if notification_type:
|
|
filters["type"] = notification_type
|
|
|
|
if unread_only:
|
|
filters["read"] = False
|
|
|
|
return await self.get_multi(
|
|
filters=filters,
|
|
skip=skip,
|
|
limit=limit,
|
|
order_by="created_at",
|
|
order_desc=True
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get notifications by recipient",
|
|
recipient_id=recipient_id,
|
|
error=str(e))
|
|
return []
|
|
|
|
async def get_broadcast_notifications(
|
|
self,
|
|
tenant_id: str,
|
|
skip: int = 0,
|
|
limit: int = 50
|
|
) -> List[Notification]:
|
|
"""Get broadcast notifications for a tenant"""
|
|
try:
|
|
return await self.get_multi(
|
|
filters={
|
|
"tenant_id": tenant_id,
|
|
"broadcast": True
|
|
},
|
|
skip=skip,
|
|
limit=limit,
|
|
order_by="created_at",
|
|
order_desc=True
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get broadcast notifications",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return []
|
|
|
|
async def update_notification_status(
|
|
self,
|
|
notification_id: str,
|
|
new_status: NotificationStatus,
|
|
error_message: str = None,
|
|
provider_message_id: str = None,
|
|
metadata: Dict[str, Any] = None
|
|
) -> Optional[Notification]:
|
|
"""Update notification status and related fields"""
|
|
try:
|
|
update_data = {
|
|
"status": new_status,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# Set timestamp based on status
|
|
if new_status == NotificationStatus.SENT:
|
|
update_data["sent_at"] = datetime.utcnow()
|
|
elif new_status == NotificationStatus.DELIVERED:
|
|
update_data["delivered_at"] = datetime.utcnow()
|
|
if "sent_at" not in update_data:
|
|
update_data["sent_at"] = datetime.utcnow()
|
|
|
|
# Add error message if provided
|
|
if error_message:
|
|
update_data["error_message"] = error_message
|
|
|
|
# Add metadata if provided
|
|
if metadata:
|
|
update_data["log_metadata"] = json.dumps(metadata)
|
|
|
|
updated_notification = await self.update(notification_id, update_data)
|
|
|
|
logger.info("Notification status updated",
|
|
notification_id=notification_id,
|
|
new_status=new_status.value,
|
|
provider_message_id=provider_message_id)
|
|
|
|
return updated_notification
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to update notification status",
|
|
notification_id=notification_id,
|
|
new_status=new_status.value,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to update status: {str(e)}")
|
|
|
|
async def increment_retry_count(self, notification_id: str) -> Optional[Notification]:
|
|
"""Increment retry count for a notification"""
|
|
try:
|
|
notification = await self.get_by_id(notification_id)
|
|
if not notification:
|
|
return None
|
|
|
|
new_retry_count = notification.retry_count + 1
|
|
update_data = {
|
|
"retry_count": new_retry_count,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# If max retries exceeded, mark as failed
|
|
if new_retry_count >= notification.max_retries:
|
|
update_data["status"] = NotificationStatus.FAILED
|
|
update_data["error_message"] = "Maximum retry attempts exceeded"
|
|
|
|
updated_notification = await self.update(notification_id, update_data)
|
|
|
|
logger.info("Notification retry count incremented",
|
|
notification_id=notification_id,
|
|
retry_count=new_retry_count,
|
|
max_retries=notification.max_retries)
|
|
|
|
return updated_notification
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to increment retry count",
|
|
notification_id=notification_id,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to increment retry count: {str(e)}")
|
|
|
|
async def mark_as_read(self, notification_id: str) -> Optional[Notification]:
|
|
"""Mark notification as read"""
|
|
try:
|
|
updated_notification = await self.update(notification_id, {
|
|
"read": True,
|
|
"read_at": datetime.utcnow()
|
|
})
|
|
|
|
logger.info("Notification marked as read",
|
|
notification_id=notification_id)
|
|
|
|
return updated_notification
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to mark notification as read",
|
|
notification_id=notification_id,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to mark as read: {str(e)}")
|
|
|
|
async def mark_multiple_as_read(
|
|
self,
|
|
recipient_id: str,
|
|
notification_ids: List[str] = None,
|
|
tenant_id: str = None
|
|
) -> int:
|
|
"""Mark multiple notifications as read"""
|
|
try:
|
|
conditions = ["recipient_id = :recipient_id", "read = false"]
|
|
params = {"recipient_id": recipient_id}
|
|
|
|
if notification_ids:
|
|
placeholders = ", ".join([f":id_{i}" for i in range(len(notification_ids))])
|
|
conditions.append(f"id IN ({placeholders})")
|
|
for i, notification_id in enumerate(notification_ids):
|
|
params[f"id_{i}"] = notification_id
|
|
|
|
if tenant_id:
|
|
conditions.append("tenant_id = :tenant_id")
|
|
params["tenant_id"] = tenant_id
|
|
|
|
query_text = f"""
|
|
UPDATE notifications
|
|
SET read = true, read_at = :read_at
|
|
WHERE {' AND '.join(conditions)}
|
|
"""
|
|
|
|
params["read_at"] = datetime.utcnow()
|
|
|
|
result = await self.session.execute(text(query_text), params)
|
|
updated_count = result.rowcount
|
|
|
|
logger.info("Multiple notifications marked as read",
|
|
recipient_id=recipient_id,
|
|
updated_count=updated_count)
|
|
|
|
return updated_count
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to mark multiple notifications as read",
|
|
recipient_id=recipient_id,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to mark multiple as read: {str(e)}")
|
|
|
|
async def get_failed_notifications_for_retry(self, hours_ago: int = 1) -> List[Notification]:
|
|
"""Get failed notifications that can be retried"""
|
|
try:
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours_ago)
|
|
|
|
query_text = """
|
|
SELECT * FROM notifications
|
|
WHERE status = 'failed'
|
|
AND retry_count < max_retries
|
|
AND updated_at >= :cutoff_time
|
|
ORDER BY priority DESC, updated_at ASC
|
|
LIMIT 100
|
|
"""
|
|
|
|
result = await self.session.execute(text(query_text), {
|
|
"cutoff_time": cutoff_time
|
|
})
|
|
|
|
notifications = []
|
|
for row in result.fetchall():
|
|
record_dict = dict(row._mapping)
|
|
# Convert enum strings back to enum objects
|
|
record_dict["status"] = NotificationStatus(record_dict["status"])
|
|
record_dict["type"] = NotificationType(record_dict["type"])
|
|
record_dict["priority"] = NotificationPriority(record_dict["priority"])
|
|
notification = self.model(**record_dict)
|
|
notifications.append(notification)
|
|
|
|
return notifications
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get failed notifications for retry", error=str(e))
|
|
return []
|
|
|
|
async def get_notification_statistics(
|
|
self,
|
|
tenant_id: str = None,
|
|
days_back: int = 30
|
|
) -> Dict[str, Any]:
|
|
"""Get notification statistics"""
|
|
try:
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days_back)
|
|
|
|
# Build base query conditions
|
|
conditions = ["created_at >= :cutoff_date"]
|
|
params = {"cutoff_date": cutoff_date}
|
|
|
|
if tenant_id:
|
|
conditions.append("tenant_id = :tenant_id")
|
|
params["tenant_id"] = tenant_id
|
|
|
|
where_clause = " AND ".join(conditions)
|
|
|
|
# Get statistics by status
|
|
status_query = text(f"""
|
|
SELECT status, COUNT(*) as count
|
|
FROM notifications
|
|
WHERE {where_clause}
|
|
GROUP BY status
|
|
ORDER BY count DESC
|
|
""")
|
|
|
|
result = await self.session.execute(status_query, params)
|
|
status_stats = {row.status: row.count for row in result.fetchall()}
|
|
|
|
# Get statistics by type
|
|
type_query = text(f"""
|
|
SELECT type, COUNT(*) as count
|
|
FROM notifications
|
|
WHERE {where_clause}
|
|
GROUP BY type
|
|
ORDER BY count DESC
|
|
""")
|
|
|
|
result = await self.session.execute(type_query, params)
|
|
type_stats = {row.type: row.count for row in result.fetchall()}
|
|
|
|
# Get delivery rate
|
|
delivery_query = text(f"""
|
|
SELECT
|
|
COUNT(*) as total_notifications,
|
|
COUNT(CASE WHEN status = 'delivered' THEN 1 END) as delivered_count,
|
|
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_count,
|
|
AVG(CASE WHEN sent_at IS NOT NULL AND delivered_at IS NOT NULL
|
|
THEN EXTRACT(EPOCH FROM (delivered_at - sent_at)) END) as avg_delivery_time_seconds
|
|
FROM notifications
|
|
WHERE {where_clause}
|
|
""")
|
|
|
|
result = await self.session.execute(delivery_query, params)
|
|
delivery_row = result.fetchone()
|
|
|
|
total = delivery_row.total_notifications or 0
|
|
delivered = delivery_row.delivered_count or 0
|
|
failed = delivery_row.failed_count or 0
|
|
delivery_rate = (delivered / total * 100) if total > 0 else 0
|
|
failure_rate = (failed / total * 100) if total > 0 else 0
|
|
|
|
# Get unread count (if tenant_id provided)
|
|
unread_count = 0
|
|
if tenant_id:
|
|
unread_query = text(f"""
|
|
SELECT COUNT(*) as count
|
|
FROM notifications
|
|
WHERE tenant_id = :tenant_id AND read = false
|
|
""")
|
|
|
|
result = await self.session.execute(unread_query, {"tenant_id": tenant_id})
|
|
unread_count = result.scalar() or 0
|
|
|
|
return {
|
|
"total_notifications": total,
|
|
"by_status": status_stats,
|
|
"by_type": type_stats,
|
|
"delivery_rate_percent": round(delivery_rate, 2),
|
|
"failure_rate_percent": round(failure_rate, 2),
|
|
"avg_delivery_time_seconds": float(delivery_row.avg_delivery_time_seconds or 0),
|
|
"unread_count": unread_count,
|
|
"days_analyzed": days_back
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get notification statistics",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {
|
|
"total_notifications": 0,
|
|
"by_status": {},
|
|
"by_type": {},
|
|
"delivery_rate_percent": 0.0,
|
|
"failure_rate_percent": 0.0,
|
|
"avg_delivery_time_seconds": 0.0,
|
|
"unread_count": 0,
|
|
"days_analyzed": days_back
|
|
}
|
|
|
|
async def cancel_notification(self, notification_id: str, reason: str = None) -> Optional[Notification]:
|
|
"""Cancel a pending notification"""
|
|
try:
|
|
notification = await self.get_by_id(notification_id)
|
|
if not notification:
|
|
return None
|
|
|
|
if notification.status != NotificationStatus.PENDING:
|
|
raise ValidationError("Can only cancel pending notifications")
|
|
|
|
update_data = {
|
|
"status": NotificationStatus.CANCELLED,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
if reason:
|
|
update_data["error_message"] = f"Cancelled: {reason}"
|
|
|
|
updated_notification = await self.update(notification_id, update_data)
|
|
|
|
logger.info("Notification cancelled",
|
|
notification_id=notification_id,
|
|
reason=reason)
|
|
|
|
return updated_notification
|
|
|
|
except ValidationError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to cancel notification",
|
|
notification_id=notification_id,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to cancel notification: {str(e)}")
|
|
|
|
async def schedule_notification(
|
|
self,
|
|
notification_id: str,
|
|
scheduled_at: datetime
|
|
) -> Optional[Notification]:
|
|
"""Schedule a notification for future delivery"""
|
|
try:
|
|
if scheduled_at <= datetime.utcnow():
|
|
raise ValidationError("Scheduled time must be in the future")
|
|
|
|
updated_notification = await self.update(notification_id, {
|
|
"scheduled_at": scheduled_at,
|
|
"updated_at": datetime.utcnow()
|
|
})
|
|
|
|
logger.info("Notification scheduled",
|
|
notification_id=notification_id,
|
|
scheduled_at=scheduled_at)
|
|
|
|
return updated_notification
|
|
|
|
except ValidationError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to schedule notification",
|
|
notification_id=notification_id,
|
|
error=str(e))
|
|
raise DatabaseError(f"Failed to schedule notification: {str(e)}") |