650 lines
26 KiB
Python
650 lines
26 KiB
Python
"""
|
|
Enhanced Notification API endpoints using repository pattern and dependency injection
|
|
"""
|
|
|
|
import structlog
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query, Path, BackgroundTasks
|
|
from typing import List, Optional, Dict, Any
|
|
from uuid import UUID
|
|
|
|
from app.schemas.notifications import (
|
|
NotificationCreate, NotificationResponse, NotificationHistory,
|
|
NotificationStats, NotificationPreferences, PreferencesUpdate,
|
|
BulkNotificationCreate, TemplateCreate, TemplateResponse,
|
|
DeliveryWebhook, ReadReceiptWebhook, NotificationType,
|
|
NotificationStatus, NotificationPriority
|
|
)
|
|
from app.services.notification_service import EnhancedNotificationService
|
|
from app.models.notifications import NotificationType as ModelNotificationType
|
|
from shared.auth.decorators import (
|
|
get_current_user_dep,
|
|
get_current_tenant_id_dep,
|
|
require_role
|
|
)
|
|
from shared.database.base import create_database_manager
|
|
from shared.monitoring.metrics import track_endpoint_metrics
|
|
|
|
logger = structlog.get_logger()
|
|
router = APIRouter()
|
|
|
|
# Dependency injection for enhanced notification service
|
|
def get_enhanced_notification_service():
|
|
database_manager = create_database_manager()
|
|
return EnhancedNotificationService(database_manager)
|
|
|
|
@router.post("/send", response_model=NotificationResponse)
|
|
@track_endpoint_metrics("notification_send")
|
|
async def send_notification_enhanced(
|
|
notification_data: Dict[str, Any],
|
|
tenant_id: str = Depends(get_current_tenant_id_dep),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Send a single notification with enhanced validation and features"""
|
|
|
|
try:
|
|
# Check permissions for broadcast notifications
|
|
if notification_data.get("broadcast", False) and current_user.get("role") not in ["admin", "manager"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins and managers can send broadcast notifications"
|
|
)
|
|
|
|
# Validate required fields
|
|
if not notification_data.get("message"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Message is required"
|
|
)
|
|
|
|
if not notification_data.get("type"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Notification type is required"
|
|
)
|
|
|
|
# Convert string type to enum
|
|
try:
|
|
notification_type = ModelNotificationType(notification_data["type"])
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid notification type: {notification_data['type']}"
|
|
)
|
|
|
|
# Convert priority if provided
|
|
priority = NotificationPriority.NORMAL
|
|
if "priority" in notification_data:
|
|
try:
|
|
priority = NotificationPriority(notification_data["priority"])
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid priority: {notification_data['priority']}"
|
|
)
|
|
|
|
# Create notification using enhanced service
|
|
notification = await notification_service.create_notification(
|
|
tenant_id=tenant_id,
|
|
sender_id=current_user["user_id"],
|
|
notification_type=notification_type,
|
|
message=notification_data["message"],
|
|
recipient_id=notification_data.get("recipient_id"),
|
|
recipient_email=notification_data.get("recipient_email"),
|
|
recipient_phone=notification_data.get("recipient_phone"),
|
|
subject=notification_data.get("subject"),
|
|
html_content=notification_data.get("html_content"),
|
|
template_key=notification_data.get("template_key"),
|
|
template_data=notification_data.get("template_data"),
|
|
priority=priority,
|
|
scheduled_at=notification_data.get("scheduled_at"),
|
|
broadcast=notification_data.get("broadcast", False)
|
|
)
|
|
|
|
logger.info("Notification sent successfully",
|
|
notification_id=notification.id,
|
|
tenant_id=tenant_id,
|
|
type=notification_type.value,
|
|
priority=priority.value)
|
|
|
|
return NotificationResponse.from_orm(notification)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to send notification",
|
|
tenant_id=tenant_id,
|
|
sender_id=current_user["user_id"],
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to send notification"
|
|
)
|
|
|
|
@router.get("/notifications/{notification_id}", response_model=NotificationResponse)
|
|
@track_endpoint_metrics("notification_get")
|
|
async def get_notification_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Get a specific notification by ID with enhanced access control"""
|
|
|
|
try:
|
|
notification = await notification_service.get_notification_by_id(str(notification_id))
|
|
|
|
if not notification:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found"
|
|
)
|
|
|
|
# Verify user has access to this notification
|
|
if (notification.recipient_id != current_user["user_id"] and
|
|
notification.sender_id != current_user["user_id"] and
|
|
not notification.broadcast and
|
|
current_user.get("role") not in ["admin", "manager"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to notification"
|
|
)
|
|
|
|
return NotificationResponse.from_orm(notification)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to get notification",
|
|
notification_id=str(notification_id),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to get notification"
|
|
)
|
|
|
|
@router.get("/notifications/user/{user_id}", response_model=List[NotificationResponse])
|
|
@track_endpoint_metrics("notification_get_user_notifications")
|
|
async def get_user_notifications_enhanced(
|
|
user_id: str = Path(..., description="User ID"),
|
|
tenant_id: Optional[str] = Query(None, description="Filter by tenant ID"),
|
|
unread_only: bool = Query(False, description="Only return unread notifications"),
|
|
notification_type: Optional[NotificationType] = Query(None, description="Filter by notification type"),
|
|
skip: int = Query(0, ge=0, description="Number of records to skip"),
|
|
limit: int = Query(50, ge=1, le=100, description="Maximum number of records"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Get notifications for a user with enhanced filtering"""
|
|
|
|
# Users can only get their own notifications unless they're admin
|
|
if user_id != current_user["user_id"] and current_user.get("role") not in ["admin", "manager"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Can only access your own notifications"
|
|
)
|
|
|
|
try:
|
|
# Convert string type to model enum if provided
|
|
model_notification_type = None
|
|
if notification_type:
|
|
try:
|
|
model_notification_type = ModelNotificationType(notification_type.value)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid notification type: {notification_type.value}"
|
|
)
|
|
|
|
notifications = await notification_service.get_user_notifications(
|
|
user_id=user_id,
|
|
tenant_id=tenant_id,
|
|
unread_only=unread_only,
|
|
notification_type=model_notification_type,
|
|
skip=skip,
|
|
limit=limit
|
|
)
|
|
|
|
return [NotificationResponse.from_orm(notification) for notification in notifications]
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to get user notifications",
|
|
user_id=user_id,
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to get user notifications"
|
|
)
|
|
|
|
@router.get("/notifications/tenant/{tenant_id}", response_model=List[NotificationResponse])
|
|
@track_endpoint_metrics("notification_get_tenant_notifications")
|
|
async def get_tenant_notifications_enhanced(
|
|
tenant_id: str = Path(..., description="Tenant ID"),
|
|
status_filter: Optional[NotificationStatus] = Query(None, description="Filter by status"),
|
|
notification_type: Optional[NotificationType] = Query(None, description="Filter by type"),
|
|
skip: int = Query(0, ge=0, description="Number of records to skip"),
|
|
limit: int = Query(50, ge=1, le=100, description="Maximum number of records"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Get notifications for a tenant with enhanced filtering (admin/manager only)"""
|
|
|
|
if current_user.get("role") not in ["admin", "manager"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins and managers can view tenant notifications"
|
|
)
|
|
|
|
try:
|
|
# Convert enums if provided
|
|
model_notification_type = None
|
|
if notification_type:
|
|
try:
|
|
model_notification_type = ModelNotificationType(notification_type.value)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid notification type: {notification_type.value}"
|
|
)
|
|
|
|
model_status = None
|
|
if status_filter:
|
|
try:
|
|
from app.models.notifications import NotificationStatus as ModelStatus
|
|
model_status = ModelStatus(status_filter.value)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid status: {status_filter.value}"
|
|
)
|
|
|
|
notifications = await notification_service.get_tenant_notifications(
|
|
tenant_id=tenant_id,
|
|
status=model_status,
|
|
notification_type=model_notification_type,
|
|
skip=skip,
|
|
limit=limit
|
|
)
|
|
|
|
return [NotificationResponse.from_orm(notification) for notification in notifications]
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to get tenant notifications",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to get tenant notifications"
|
|
)
|
|
|
|
@router.patch("/notifications/{notification_id}/read")
|
|
@track_endpoint_metrics("notification_mark_read")
|
|
async def mark_notification_read_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Mark a notification as read with enhanced validation"""
|
|
|
|
try:
|
|
success = await notification_service.mark_notification_as_read(
|
|
str(notification_id),
|
|
current_user["user_id"]
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found or access denied"
|
|
)
|
|
|
|
return {"success": True, "message": "Notification marked as read"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to mark notification as read",
|
|
notification_id=str(notification_id),
|
|
user_id=current_user["user_id"],
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to mark notification as read"
|
|
)
|
|
|
|
@router.patch("/notifications/mark-multiple-read")
|
|
@track_endpoint_metrics("notification_mark_multiple_read")
|
|
async def mark_multiple_notifications_read_enhanced(
|
|
request_data: Dict[str, Any],
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Mark multiple notifications as read with enhanced batch processing"""
|
|
|
|
try:
|
|
notification_ids = request_data.get("notification_ids")
|
|
tenant_id = request_data.get("tenant_id")
|
|
|
|
if not notification_ids and not tenant_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Either notification_ids or tenant_id must be provided"
|
|
)
|
|
|
|
# Convert UUID strings to strings if needed
|
|
if notification_ids:
|
|
notification_ids = [str(nid) for nid in notification_ids]
|
|
|
|
marked_count = await notification_service.mark_multiple_as_read(
|
|
user_id=current_user["user_id"],
|
|
notification_ids=notification_ids,
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"marked_count": marked_count,
|
|
"message": f"Marked {marked_count} notifications as read"
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to mark multiple notifications as read",
|
|
user_id=current_user["user_id"],
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to mark notifications as read"
|
|
)
|
|
|
|
@router.patch("/notifications/{notification_id}/status")
|
|
@track_endpoint_metrics("notification_update_status")
|
|
async def update_notification_status_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
status_data: Dict[str, Any] = ...,
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Update notification status with enhanced logging and validation"""
|
|
|
|
# Only system users or admins can update notification status
|
|
if (current_user.get("type") != "service" and
|
|
current_user.get("role") not in ["admin", "system"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only system services or admins can update notification status"
|
|
)
|
|
|
|
try:
|
|
new_status = status_data.get("status")
|
|
if not new_status:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Status is required"
|
|
)
|
|
|
|
# Convert string status to enum
|
|
try:
|
|
from app.models.notifications import NotificationStatus as ModelStatus
|
|
model_status = ModelStatus(new_status)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid status: {new_status}"
|
|
)
|
|
|
|
updated_notification = await notification_service.update_notification_status(
|
|
notification_id=str(notification_id),
|
|
new_status=model_status,
|
|
error_message=status_data.get("error_message"),
|
|
provider_message_id=status_data.get("provider_message_id"),
|
|
metadata=status_data.get("metadata"),
|
|
response_time_ms=status_data.get("response_time_ms"),
|
|
provider=status_data.get("provider")
|
|
)
|
|
|
|
if not updated_notification:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found"
|
|
)
|
|
|
|
return NotificationResponse.from_orm(updated_notification)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to update notification status",
|
|
notification_id=str(notification_id),
|
|
status=status_data.get("status"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to update notification status"
|
|
)
|
|
|
|
@router.get("/notifications/pending", response_model=List[NotificationResponse])
|
|
@track_endpoint_metrics("notification_get_pending")
|
|
async def get_pending_notifications_enhanced(
|
|
limit: int = Query(100, ge=1, le=1000, description="Maximum number of notifications"),
|
|
notification_type: Optional[NotificationType] = Query(None, description="Filter by type"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Get pending notifications for processing (system/admin only)"""
|
|
|
|
if (current_user.get("type") != "service" and
|
|
current_user.get("role") not in ["admin", "system"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only system services or admins can access pending notifications"
|
|
)
|
|
|
|
try:
|
|
model_notification_type = None
|
|
if notification_type:
|
|
try:
|
|
model_notification_type = ModelNotificationType(notification_type.value)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid notification type: {notification_type.value}"
|
|
)
|
|
|
|
notifications = await notification_service.get_pending_notifications(
|
|
limit=limit,
|
|
notification_type=model_notification_type
|
|
)
|
|
|
|
return [NotificationResponse.from_orm(notification) for notification in notifications]
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to get pending notifications",
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to get pending notifications"
|
|
)
|
|
|
|
@router.post("/notifications/{notification_id}/schedule")
|
|
@track_endpoint_metrics("notification_schedule")
|
|
async def schedule_notification_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
schedule_data: Dict[str, Any] = ...,
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Schedule a notification for future delivery with enhanced validation"""
|
|
|
|
try:
|
|
scheduled_at = schedule_data.get("scheduled_at")
|
|
if not scheduled_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="scheduled_at is required"
|
|
)
|
|
|
|
# Parse datetime if it's a string
|
|
if isinstance(scheduled_at, str):
|
|
try:
|
|
scheduled_at = datetime.fromisoformat(scheduled_at.replace('Z', '+00:00'))
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid datetime format. Use ISO format."
|
|
)
|
|
|
|
# Check that the scheduled time is in the future
|
|
if scheduled_at <= datetime.utcnow():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Scheduled time must be in the future"
|
|
)
|
|
|
|
success = await notification_service.schedule_notification(
|
|
str(notification_id),
|
|
scheduled_at
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found or cannot be scheduled"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Notification scheduled successfully",
|
|
"scheduled_at": scheduled_at.isoformat()
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to schedule notification",
|
|
notification_id=str(notification_id),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to schedule notification"
|
|
)
|
|
|
|
@router.post("/notifications/{notification_id}/cancel")
|
|
@track_endpoint_metrics("notification_cancel")
|
|
async def cancel_notification_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
cancel_data: Optional[Dict[str, Any]] = None,
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Cancel a pending notification with enhanced validation"""
|
|
|
|
try:
|
|
reason = None
|
|
if cancel_data:
|
|
reason = cancel_data.get("reason", "Cancelled by user")
|
|
else:
|
|
reason = "Cancelled by user"
|
|
|
|
success = await notification_service.cancel_notification(
|
|
str(notification_id),
|
|
reason
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found or cannot be cancelled"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Notification cancelled successfully",
|
|
"reason": reason
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to cancel notification",
|
|
notification_id=str(notification_id),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to cancel notification"
|
|
)
|
|
|
|
@router.post("/notifications/{notification_id}/retry")
|
|
@track_endpoint_metrics("notification_retry")
|
|
async def retry_failed_notification_enhanced(
|
|
notification_id: UUID = Path(..., description="Notification ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Retry a failed notification with enhanced validation"""
|
|
|
|
# Only admins can retry notifications
|
|
if current_user.get("role") not in ["admin", "system"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can retry failed notifications"
|
|
)
|
|
|
|
try:
|
|
success = await notification_service.retry_failed_notification(str(notification_id))
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Notification not found, not failed, or max retries exceeded"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Notification queued for retry"
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to retry notification",
|
|
notification_id=str(notification_id),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to retry notification"
|
|
)
|
|
|
|
@router.get("/statistics", dependencies=[Depends(require_role(["admin", "manager"]))])
|
|
@track_endpoint_metrics("notification_get_statistics")
|
|
async def get_notification_statistics_enhanced(
|
|
tenant_id: Optional[str] = Query(None, description="Filter by tenant ID"),
|
|
days_back: int = Query(30, ge=1, le=365, description="Number of days to look back"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
notification_service: EnhancedNotificationService = Depends(get_enhanced_notification_service)
|
|
):
|
|
"""Get comprehensive notification statistics with enhanced analytics"""
|
|
|
|
try:
|
|
stats = await notification_service.get_notification_statistics(
|
|
tenant_id=tenant_id,
|
|
days_back=days_back
|
|
)
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get notification statistics",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to get notification statistics"
|
|
) |