Files
bakery-ia/services/notification/app/services/notification_service.py
2025-08-08 09:08:41 +02:00

697 lines
26 KiB
Python

"""
Enhanced Notification Service
Business logic layer using repository pattern for notification operations
"""
import structlog
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Union
from sqlalchemy.ext.asyncio import AsyncSession
import json
from app.repositories import (
NotificationRepository,
TemplateRepository,
PreferenceRepository,
LogRepository
)
from app.models.notifications import (
Notification, NotificationTemplate, NotificationPreference, NotificationLog,
NotificationStatus, NotificationType, NotificationPriority
)
from shared.database.exceptions import DatabaseError, ValidationError, DuplicateRecordError
from shared.database.transactions import transactional
from shared.database.base import create_database_manager
from shared.database.unit_of_work import UnitOfWork
logger = structlog.get_logger()
class EnhancedNotificationService:
"""Enhanced notification management business logic using repository pattern with dependency injection"""
def __init__(self, database_manager=None):
self.database_manager = database_manager or create_database_manager()
async def _init_repositories(self, session):
"""Initialize repositories with session"""
self.notification_repo = NotificationRepository(session)
self.template_repo = TemplateRepository(session)
self.preference_repo = PreferenceRepository(session)
self.log_repo = LogRepository(session)
return {
'notification': self.notification_repo,
'template': self.template_repo,
'preference': self.preference_repo,
'log': self.log_repo
}
@transactional
async def create_notification(
self,
tenant_id: str,
sender_id: str,
notification_type: NotificationType,
message: str,
recipient_id: str = None,
recipient_email: str = None,
recipient_phone: str = None,
subject: str = None,
html_content: str = None,
template_key: str = None,
template_data: Dict[str, Any] = None,
priority: NotificationPriority = NotificationPriority.NORMAL,
scheduled_at: datetime = None,
broadcast: bool = False,
session: AsyncSession = None
) -> Notification:
"""Create a new notification with enhanced validation and template support"""
try:
async with self.database_manager.get_session() as db_session:
async with UnitOfWork(db_session) as uow:
# Register repositories
notification_repo = uow.register_repository("notifications", NotificationRepository)
template_repo = uow.register_repository("templates", TemplateRepository)
preference_repo = uow.register_repository("preferences", PreferenceRepository)
log_repo = uow.register_repository("logs", LogRepository)
notification_data = {
"tenant_id": tenant_id,
"sender_id": sender_id,
"type": notification_type,
"message": message,
"priority": priority,
"broadcast": broadcast
}
# Add recipient information
if recipient_id:
notification_data["recipient_id"] = recipient_id
if recipient_email:
notification_data["recipient_email"] = recipient_email
if recipient_phone:
notification_data["recipient_phone"] = recipient_phone
# Add optional fields
if subject:
notification_data["subject"] = subject
if html_content:
notification_data["html_content"] = html_content
if scheduled_at:
notification_data["scheduled_at"] = scheduled_at
# Handle template processing
if template_key:
template = await template_repo.get_by_template_key(template_key)
if not template:
raise ValidationError(f"Template with key '{template_key}' not found")
# Process template with provided data
processed_content = await self._process_template(template, template_data or {})
# Update notification data with processed template content
notification_data.update(processed_content)
notification_data["template_id"] = template_key
if template_data:
notification_data["template_data"] = json.dumps(template_data)
# Check recipient preferences if not a broadcast
if not broadcast and recipient_id:
can_send = await self._check_recipient_preferences(
recipient_id, tenant_id, notification_type, priority, preference_repo
)
if not can_send["allowed"]:
logger.info("Notification blocked by recipient preferences",
recipient_id=recipient_id,
reason=can_send["reason"])
raise ValidationError(f"Notification blocked: {can_send['reason']}")
# Create the notification
notification = await notification_repo.create_notification(notification_data)
logger.info("Notification created successfully",
notification_id=notification.id,
tenant_id=tenant_id,
type=notification_type.value,
priority=priority.value,
broadcast=broadcast,
scheduled=scheduled_at is not None)
return notification
except (ValidationError, DatabaseError):
raise
except Exception as e:
logger.error("Failed to create notification",
tenant_id=tenant_id,
type=notification_type.value,
error=str(e))
raise DatabaseError(f"Failed to create notification: {str(e)}")
async def get_notification_by_id(self, notification_id: str) -> Optional[Notification]:
"""Get notification by ID"""
try:
async with self.database_manager.get_session() as db_session:
await self._init_repositories(db_session)
return await self.notification_repo.get_by_id(notification_id)
except Exception as e:
logger.error("Failed to get notification",
notification_id=notification_id,
error=str(e))
return None
async def get_user_notifications(
self,
user_id: str,
tenant_id: str = None,
unread_only: bool = False,
notification_type: NotificationType = None,
skip: int = 0,
limit: int = 50
) -> List[Notification]:
"""Get notifications for a user with filters"""
try:
async with self.database_manager.get_session() as db_session:
await self._init_repositories(db_session)
return await self.notification_repo.get_notifications_by_recipient(
recipient_id=user_id,
tenant_id=tenant_id,
status=None,
notification_type=notification_type,
unread_only=unread_only,
skip=skip,
limit=limit
)
except Exception as e:
logger.error("Failed to get user notifications",
user_id=user_id,
error=str(e))
return []
async def get_tenant_notifications(
self,
tenant_id: str,
status: NotificationStatus = None,
notification_type: NotificationType = None,
skip: int = 0,
limit: int = 50
) -> List[Notification]:
"""Get notifications for a tenant"""
try:
filters = {"tenant_id": tenant_id}
if status:
filters["status"] = status
if notification_type:
filters["type"] = notification_type
return await self.notification_repo.get_multi(
filters=filters,
skip=skip,
limit=limit,
order_by="created_at",
order_desc=True
)
except Exception as e:
logger.error("Failed to get tenant notifications",
tenant_id=tenant_id,
error=str(e))
return []
async def mark_notification_as_read(self, notification_id: str, user_id: str) -> bool:
"""Mark a notification as read by a user"""
try:
# Verify the notification belongs to the user
notification = await self.notification_repo.get_by_id(notification_id)
if not notification:
return False
# Allow if it's the recipient or a broadcast notification
if notification.recipient_id != user_id and not notification.broadcast:
logger.warning("User attempted to mark notification as read without permission",
notification_id=notification_id,
user_id=user_id,
actual_recipient=notification.recipient_id)
return False
updated_notification = await self.notification_repo.mark_as_read(notification_id)
return updated_notification is not None
except Exception as e:
logger.error("Failed to mark notification as read",
notification_id=notification_id,
user_id=user_id,
error=str(e))
return False
async def mark_multiple_as_read(
self,
user_id: str,
notification_ids: List[str] = None,
tenant_id: str = None
) -> int:
"""Mark multiple notifications as read for a user"""
try:
return await self.notification_repo.mark_multiple_as_read(
recipient_id=user_id,
notification_ids=notification_ids,
tenant_id=tenant_id
)
except Exception as e:
logger.error("Failed to mark multiple notifications as read",
user_id=user_id,
error=str(e))
return 0
@transactional
async def update_notification_status(
self,
notification_id: str,
new_status: NotificationStatus,
error_message: str = None,
provider_message_id: str = None,
metadata: Dict[str, Any] = None,
response_time_ms: int = None,
provider: str = None,
session: AsyncSession = None
) -> Optional[Notification]:
"""Update notification status and create log entry"""
try:
# Update the notification status
updated_notification = await self.notification_repo.update_notification_status(
notification_id, new_status, error_message, provider_message_id, metadata
)
if not updated_notification:
return None
# Create a log entry
log_data = {
"notification_id": notification_id,
"attempt_number": updated_notification.retry_count + 1,
"status": new_status,
"provider": provider,
"provider_message_id": provider_message_id,
"response_time_ms": response_time_ms,
"error_message": error_message,
"log_metadata": metadata
}
await self.log_repo.create_log_entry(log_data)
logger.info("Notification status updated with log entry",
notification_id=notification_id,
new_status=new_status.value,
provider=provider)
return updated_notification
except Exception as e:
logger.error("Failed to update notification status",
notification_id=notification_id,
new_status=new_status.value,
error=str(e))
raise DatabaseError(f"Failed to update status: {str(e)}")
async def get_pending_notifications(
self,
limit: int = 100,
notification_type: NotificationType = None
) -> List[Notification]:
"""Get pending notifications for processing"""
try:
pending = await self.notification_repo.get_pending_notifications(limit)
if notification_type:
# Filter by type if specified
pending = [n for n in pending if n.type == notification_type]
return pending
except Exception as e:
logger.error("Failed to get pending notifications",
type=notification_type.value if notification_type else None,
error=str(e))
return []
async def schedule_notification(
self,
notification_id: str,
scheduled_at: datetime
) -> bool:
"""Schedule a notification for future delivery"""
try:
updated_notification = await self.notification_repo.schedule_notification(
notification_id, scheduled_at
)
return updated_notification is not None
except ValidationError as e:
logger.warning("Failed to schedule notification",
notification_id=notification_id,
scheduled_at=scheduled_at,
error=str(e))
return False
except Exception as e:
logger.error("Failed to schedule notification",
notification_id=notification_id,
error=str(e))
return False
async def cancel_notification(
self,
notification_id: str,
reason: str = None
) -> bool:
"""Cancel a pending notification"""
try:
cancelled = await self.notification_repo.cancel_notification(
notification_id, reason
)
return cancelled is not None
except ValidationError as e:
logger.warning("Failed to cancel notification",
notification_id=notification_id,
error=str(e))
return False
except Exception as e:
logger.error("Failed to cancel notification",
notification_id=notification_id,
error=str(e))
return False
async def retry_failed_notification(self, notification_id: str) -> bool:
"""Retry a failed notification"""
try:
notification = await self.notification_repo.get_by_id(notification_id)
if not notification:
return False
if notification.status != NotificationStatus.FAILED:
logger.warning("Cannot retry notification that is not failed",
notification_id=notification_id,
current_status=notification.status.value)
return False
if notification.retry_count >= notification.max_retries:
logger.warning("Cannot retry notification - max retries exceeded",
notification_id=notification_id,
retry_count=notification.retry_count,
max_retries=notification.max_retries)
return False
# Reset status to pending for retry
updated = await self.notification_repo.update_notification_status(
notification_id, NotificationStatus.PENDING
)
if updated:
logger.info("Notification queued for retry",
notification_id=notification_id,
retry_count=notification.retry_count)
return updated is not None
except Exception as e:
logger.error("Failed to retry notification",
notification_id=notification_id,
error=str(e))
return False
async def get_notification_statistics(
self,
tenant_id: str = None,
days_back: int = 30
) -> Dict[str, Any]:
"""Get comprehensive notification statistics"""
try:
# Get notification statistics
notification_stats = await self.notification_repo.get_notification_statistics(
tenant_id, days_back
)
# Get delivery performance statistics
delivery_stats = await self.log_repo.get_delivery_performance_stats(
hours_back=days_back * 24
)
return {
"notifications": notification_stats,
"delivery_performance": delivery_stats
}
except Exception as e:
logger.error("Failed to get notification statistics",
tenant_id=tenant_id,
error=str(e))
return {
"notifications": {},
"delivery_performance": {}
}
# Template Management Methods
@transactional
async def create_template(
self,
template_data: Dict[str, Any],
session: AsyncSession = None
) -> NotificationTemplate:
"""Create a new notification template"""
try:
return await self.template_repo.create_template(template_data)
except (ValidationError, DuplicateRecordError):
raise
except Exception as e:
logger.error("Failed to create template",
template_key=template_data.get("template_key"),
error=str(e))
raise DatabaseError(f"Failed to create template: {str(e)}")
async def get_template(self, template_key: str) -> Optional[NotificationTemplate]:
"""Get template by key"""
try:
return await self.template_repo.get_by_template_key(template_key)
except Exception as e:
logger.error("Failed to get template",
template_key=template_key,
error=str(e))
return None
async def get_templates_by_category(
self,
category: str,
tenant_id: str = None,
include_system: bool = True
) -> List[NotificationTemplate]:
"""Get templates by category"""
try:
return await self.template_repo.get_templates_by_category(
category, tenant_id, include_system
)
except Exception as e:
logger.error("Failed to get templates by category",
category=category,
tenant_id=tenant_id,
error=str(e))
return []
async def search_templates(
self,
search_term: str,
tenant_id: str = None,
category: str = None,
notification_type: NotificationType = None,
include_system: bool = True
) -> List[NotificationTemplate]:
"""Search templates"""
try:
return await self.template_repo.search_templates(
search_term, tenant_id, category, notification_type, include_system
)
except Exception as e:
logger.error("Failed to search templates",
search_term=search_term,
error=str(e))
return []
# Preference Management Methods
@transactional
async def create_user_preferences(
self,
user_id: str,
tenant_id: str,
preferences: Dict[str, Any] = None,
session: AsyncSession = None
) -> NotificationPreference:
"""Create user notification preferences"""
try:
preference_data = {
"user_id": user_id,
"tenant_id": tenant_id
}
if preferences:
preference_data.update(preferences)
return await self.preference_repo.create_preferences(preference_data)
except (ValidationError, DuplicateRecordError):
raise
except Exception as e:
logger.error("Failed to create user preferences",
user_id=user_id,
tenant_id=tenant_id,
error=str(e))
raise DatabaseError(f"Failed to create preferences: {str(e)}")
async def get_user_preferences(
self,
user_id: str,
tenant_id: str
) -> Optional[NotificationPreference]:
"""Get user notification preferences"""
try:
return await self.preference_repo.get_user_preferences(user_id, tenant_id)
except Exception as e:
logger.error("Failed to get user preferences",
user_id=user_id,
tenant_id=tenant_id,
error=str(e))
return None
@transactional
async def update_user_preferences(
self,
user_id: str,
tenant_id: str,
updates: Dict[str, Any],
session: AsyncSession = None
) -> Optional[NotificationPreference]:
"""Update user notification preferences"""
try:
return await self.preference_repo.update_user_preferences(
user_id, tenant_id, updates
)
except ValidationError:
raise
except Exception as e:
logger.error("Failed to update user preferences",
user_id=user_id,
tenant_id=tenant_id,
error=str(e))
raise DatabaseError(f"Failed to update preferences: {str(e)}")
# Helper Methods
async def _process_template(
self,
template: NotificationTemplate,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""Process template with provided data"""
try:
result = {}
# Process subject if available
if template.subject_template:
result["subject"] = self._replace_template_variables(
template.subject_template, data
)
# Process body template
result["message"] = self._replace_template_variables(
template.body_template, data
)
# Process HTML template if available
if template.html_template:
result["html_content"] = self._replace_template_variables(
template.html_template, data
)
return result
except Exception as e:
logger.error("Failed to process template",
template_key=template.template_key,
error=str(e))
raise ValidationError(f"Template processing failed: {str(e)}")
def _replace_template_variables(self, template_text: str, data: Dict[str, Any]) -> str:
"""Replace template variables with actual values"""
try:
# Simple variable replacement using format()
# In a real implementation, you might use Jinja2 or similar
result = template_text
for key, value in data.items():
placeholder = f"{{{key}}}"
if placeholder in result:
result = result.replace(placeholder, str(value))
return result
except Exception as e:
logger.error("Failed to replace template variables", error=str(e))
return template_text
async def _check_recipient_preferences(
self,
recipient_id: str,
tenant_id: str,
notification_type: NotificationType,
priority: NotificationPriority,
preference_repo: PreferenceRepository = None
) -> Dict[str, Any]:
"""Check if notification can be sent based on recipient preferences"""
try:
# Get notification category based on type
category = "alerts" # Default
if notification_type == NotificationType.EMAIL:
category = "alerts" # You might have more sophisticated logic here
# Check if email can be sent based on preferences
if notification_type == NotificationType.EMAIL:
repo = preference_repo or self.preference_repo
return await repo.can_send_email(
recipient_id, tenant_id, category
)
# For other types, implement similar checks
# For now, allow all other types
return {"allowed": True, "reason": "No restrictions"}
except Exception as e:
logger.error("Failed to check recipient preferences",
recipient_id=recipient_id,
tenant_id=tenant_id,
error=str(e))
# Default to allowing on error
return {"allowed": True, "reason": "Error checking preferences"}
# Legacy compatibility alias
NotificationService = EnhancedNotificationService