# ================================================================ # services/notification/app/services/notification_service.py # ================================================================ """ Main notification service business logic Orchestrates notification delivery across multiple channels """ import structlog from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta import asyncio import uuid from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, desc, func, update from jinja2 import Template from app.models.notifications import ( Notification, NotificationTemplate, NotificationPreference, NotificationLog, NotificationType, NotificationStatus, NotificationPriority ) from app.schemas.notifications import ( NotificationCreate, NotificationResponse, NotificationHistory, NotificationStats, BulkNotificationCreate ) from app.services.email_service import EmailService from app.services.whatsapp_service import WhatsAppService from app.services.messaging import publish_notification_sent, publish_notification_failed from app.core.config import settings from app.core.database import get_db from shared.monitoring.metrics import MetricsCollector logger = structlog.get_logger() metrics = MetricsCollector("notification-service") class NotificationService: """ Main service class for managing notification operations. Handles email, WhatsApp, and other notification channels. """ def __init__(self): self.email_service = EmailService() self.whatsapp_service = WhatsAppService() async def send_notification(self, notification: NotificationCreate) -> NotificationResponse: """Send a single notification""" try: start_time = datetime.utcnow() # Create notification record async for db in get_db(): # Check user preferences if recipient specified if notification.recipient_id: preferences = await self._get_user_preferences( db, notification.recipient_id, notification.tenant_id ) # Check if user allows this type of notification if not self._is_notification_allowed(notification.type, preferences): logger.info("Notification blocked by user preferences", recipient=notification.recipient_id, type=notification.type.value) # Still create record but mark as cancelled db_notification = await self._create_notification_record( db, notification, NotificationStatus.CANCELLED ) await db.commit() return NotificationResponse.from_orm(db_notification) # Create pending notification db_notification = await self._create_notification_record( db, notification, NotificationStatus.PENDING ) await db.commit() # Process template if specified if notification.template_id: notification = await self._process_template( db, notification, notification.template_id ) # Send based on type success = False error_message = None try: if notification.type == NotificationType.EMAIL: success = await self._send_email(notification) elif notification.type == NotificationType.WHATSAPP: success = await self._send_whatsapp(notification) elif notification.type == NotificationType.PUSH: success = await self._send_push(notification) else: error_message = f"Unsupported notification type: {notification.type}" except Exception as e: logger.error("Failed to send notification", error=str(e)) error_message = str(e) # Update notification status new_status = NotificationStatus.SENT if success else NotificationStatus.FAILED await self._update_notification_status( db, db_notification.id, new_status, error_message ) # Log attempt await self._log_delivery_attempt( db, db_notification.id, 1, new_status, error_message ) await db.commit() # Publish event if success: await publish_notification_sent({ "notification_id": str(db_notification.id), "type": notification.type.value, "tenant_id": notification.tenant_id, "recipient_id": notification.recipient_id }) else: await publish_notification_failed({ "notification_id": str(db_notification.id), "type": notification.type.value, "error": error_message, "tenant_id": notification.tenant_id }) # Record metrics processing_time = (datetime.utcnow() - start_time).total_seconds() metrics.observe_histogram( "notification_processing_duration_seconds", processing_time ) metrics.increment_counter( "notifications_sent_total", labels={ "type": notification.type.value, "status": "success" if success else "failed" } ) # Refresh the object to get updated data await db.refresh(db_notification) return NotificationResponse.from_orm(db_notification) except Exception as e: logger.error("Failed to process notification", error=str(e)) metrics.increment_counter("notification_errors_total") raise async def send_bulk_notifications(self, bulk_request: BulkNotificationCreate) -> Dict[str, Any]: """Send notifications to multiple recipients""" try: results = { "total": len(bulk_request.recipients), "sent": 0, "failed": 0, "notification_ids": [] } # Process in batches to avoid overwhelming the system batch_size = settings.BATCH_SIZE for i in range(0, len(bulk_request.recipients), batch_size): batch = bulk_request.recipients[i:i + batch_size] # Create individual notifications for each recipient tasks = [] for recipient in batch: individual_notification = NotificationCreate( type=bulk_request.type, recipient_id=recipient if not "@" in recipient else None, recipient_email=recipient if "@" in recipient else None, subject=bulk_request.subject, message=bulk_request.message, html_content=bulk_request.html_content, template_id=bulk_request.template_id, template_data=bulk_request.template_data, priority=bulk_request.priority, scheduled_at=bulk_request.scheduled_at, broadcast=True ) tasks.append(self.send_notification(individual_notification)) # Process batch concurrently batch_results = await asyncio.gather(*tasks, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): results["failed"] += 1 logger.error("Bulk notification failed", error=str(result)) else: results["sent"] += 1 results["notification_ids"].append(result.id) # Small delay between batches to prevent rate limiting if i + batch_size < len(bulk_request.recipients): await asyncio.sleep(0.1) logger.info("Bulk notification completed", total=results["total"], sent=results["sent"], failed=results["failed"]) return results except Exception as e: logger.error("Failed to send bulk notifications", error=str(e)) raise async def get_notification_history( self, user_id: str, tenant_id: str, page: int = 1, per_page: int = 50, type_filter: Optional[NotificationType] = None, status_filter: Optional[NotificationStatus] = None ) -> NotificationHistory: """Get notification history for a user""" try: async for db in get_db(): # Build query query = select(Notification).where( and_( Notification.tenant_id == tenant_id, Notification.recipient_id == user_id ) ) if type_filter: query = query.where(Notification.type == type_filter) if status_filter: query = query.where(Notification.status == status_filter) # Get total count count_query = select(func.count()).select_from(query.subquery()) total = await db.scalar(count_query) # Get paginated results offset = (page - 1) * per_page query = query.order_by(desc(Notification.created_at)).offset(offset).limit(per_page) result = await db.execute(query) notifications = result.scalars().all() # Convert to response objects notification_responses = [ NotificationResponse.from_orm(notification) for notification in notifications ] return NotificationHistory( notifications=notification_responses, total=total, page=page, per_page=per_page, has_next=offset + per_page < total, has_prev=page > 1 ) except Exception as e: logger.error("Failed to get notification history", error=str(e)) raise async def get_notification_stats(self, tenant_id: str, days: int = 30) -> NotificationStats: """Get notification statistics for a tenant""" try: async for db in get_db(): # Date range start_date = datetime.utcnow() - timedelta(days=days) # Basic counts base_query = select(Notification).where( and_( Notification.tenant_id == tenant_id, Notification.created_at >= start_date ) ) # Total sent sent_query = base_query.where(Notification.status == NotificationStatus.SENT) total_sent = await db.scalar(select(func.count()).select_from(sent_query.subquery())) # Total delivered delivered_query = base_query.where(Notification.status == NotificationStatus.DELIVERED) total_delivered = await db.scalar(select(func.count()).select_from(delivered_query.subquery())) # Total failed failed_query = base_query.where(Notification.status == NotificationStatus.FAILED) total_failed = await db.scalar(select(func.count()).select_from(failed_query.subquery())) # Delivery rate delivery_rate = (total_delivered / max(total_sent, 1)) * 100 # Average delivery time avg_delivery_time = None if total_delivered > 0: delivery_time_query = select( func.avg( func.extract('epoch', Notification.delivered_at - Notification.sent_at) / 60 ) ).where( and_( Notification.tenant_id == tenant_id, Notification.status == NotificationStatus.DELIVERED, Notification.sent_at.isnot(None), Notification.delivered_at.isnot(None), Notification.created_at >= start_date ) ) avg_delivery_time = await db.scalar(delivery_time_query) # By type type_query = select( Notification.type, func.count(Notification.id) ).where( and_( Notification.tenant_id == tenant_id, Notification.created_at >= start_date ) ).group_by(Notification.type) type_results = await db.execute(type_query) by_type = {str(row[0].value): row[1] for row in type_results} # By status status_query = select( Notification.status, func.count(Notification.id) ).where( and_( Notification.tenant_id == tenant_id, Notification.created_at >= start_date ) ).group_by(Notification.status) status_results = await db.execute(status_query) by_status = {str(row[0].value): row[1] for row in status_results} # Recent activity (last 10 notifications) recent_query = base_query.order_by(desc(Notification.created_at)).limit(10) recent_result = await db.execute(recent_query) recent_notifications = recent_result.scalars().all() recent_activity = [ { "id": str(notification.id), "type": notification.type.value, "status": notification.status.value, "created_at": notification.created_at.isoformat(), "recipient_email": notification.recipient_email } for notification in recent_notifications ] return NotificationStats( total_sent=total_sent or 0, total_delivered=total_delivered or 0, total_failed=total_failed or 0, delivery_rate=round(delivery_rate, 2), avg_delivery_time_minutes=round(avg_delivery_time, 2) if avg_delivery_time else None, by_type=by_type, by_status=by_status, recent_activity=recent_activity ) except Exception as e: logger.error("Failed to get notification stats", error=str(e)) raise async def get_user_preferences( self, user_id: str, tenant_id: str ) -> Dict[str, Any]: """Get user notification preferences""" try: async for db in get_db(): result = await db.execute( select(NotificationPreference).where( and_( NotificationPreference.user_id == user_id, NotificationPreference.tenant_id == tenant_id ) ) ) preferences = result.scalar_one_or_none() if not preferences: # Create default preferences preferences = NotificationPreference( user_id=user_id, tenant_id=tenant_id ) db.add(preferences) await db.commit() await db.refresh(preferences) return { "user_id": str(preferences.user_id), "tenant_id": str(preferences.tenant_id), "email_enabled": preferences.email_enabled, "email_alerts": preferences.email_alerts, "email_marketing": preferences.email_marketing, "email_reports": preferences.email_reports, "whatsapp_enabled": preferences.whatsapp_enabled, "whatsapp_alerts": preferences.whatsapp_alerts, "whatsapp_reports": preferences.whatsapp_reports, "push_enabled": preferences.push_enabled, "push_alerts": preferences.push_alerts, "push_reports": preferences.push_reports, "quiet_hours_start": preferences.quiet_hours_start, "quiet_hours_end": preferences.quiet_hours_end, "timezone": preferences.timezone, "digest_frequency": preferences.digest_frequency, "max_emails_per_day": preferences.max_emails_per_day, "language": preferences.language, "created_at": preferences.created_at, "updated_at": preferences.updated_at } except Exception as e: logger.error("Failed to get user preferences", error=str(e)) raise async def update_user_preferences( self, user_id: str, tenant_id: str, updates: Dict[str, Any] ) -> Dict[str, Any]: """Update user notification preferences""" try: async for db in get_db(): # Get existing preferences or create new result = await db.execute( select(NotificationPreference).where( and_( NotificationPreference.user_id == user_id, NotificationPreference.tenant_id == tenant_id ) ) ) preferences = result.scalar_one_or_none() if not preferences: preferences = NotificationPreference( user_id=user_id, tenant_id=tenant_id ) db.add(preferences) # Update fields for field, value in updates.items(): if hasattr(preferences, field) and value is not None: setattr(preferences, field, value) preferences.updated_at = datetime.utcnow() await db.commit() await db.refresh(preferences) logger.info("Updated user preferences", user_id=user_id, tenant_id=tenant_id, updates=list(updates.keys())) return await self.get_user_preferences(user_id, tenant_id) except Exception as e: logger.error("Failed to update user preferences", error=str(e)) raise # ================================================================ # PRIVATE HELPER METHODS # ================================================================ async def _create_notification_record( self, db: AsyncSession, notification: NotificationCreate, status: NotificationStatus ) -> Notification: """Create a notification record in the database""" db_notification = Notification( tenant_id=notification.tenant_id, sender_id=notification.sender_id, recipient_id=notification.recipient_id, type=notification.type, status=status, priority=notification.priority, subject=notification.subject, message=notification.message, html_content=notification.html_content, template_id=notification.template_id, template_data=notification.template_data, recipient_email=notification.recipient_email, recipient_phone=notification.recipient_phone, scheduled_at=notification.scheduled_at, broadcast=notification.broadcast ) db.add(db_notification) await db.flush() # Get the ID without committing return db_notification async def _update_notification_status( self, db: AsyncSession, notification_id: uuid.UUID, status: NotificationStatus, error_message: Optional[str] = None ): """Update notification status""" update_data = { "status": status, "updated_at": datetime.utcnow() } if status == NotificationStatus.SENT: update_data["sent_at"] = datetime.utcnow() elif status == NotificationStatus.DELIVERED: update_data["delivered_at"] = datetime.utcnow() elif status == NotificationStatus.FAILED and error_message: update_data["error_message"] = error_message await db.execute( update(Notification) .where(Notification.id == notification_id) .values(**update_data) ) async def _log_delivery_attempt( self, db: AsyncSession, notification_id: uuid.UUID, attempt_number: int, status: NotificationStatus, error_message: Optional[str] = None ): """Log a delivery attempt""" log_entry = NotificationLog( notification_id=notification_id, attempt_number=attempt_number, status=status, attempted_at=datetime.utcnow(), error_message=error_message ) db.add(log_entry) async def _get_user_preferences( self, db: AsyncSession, user_id: str, tenant_id: str ) -> Optional[NotificationPreference]: """Get user preferences from database""" result = await db.execute( select(NotificationPreference).where( and_( NotificationPreference.user_id == user_id, NotificationPreference.tenant_id == tenant_id ) ) ) return result.scalar_one_or_none() def _is_notification_allowed( self, notification_type: NotificationType, preferences: Optional[NotificationPreference] ) -> bool: """Check if notification is allowed based on user preferences""" if not preferences: return True # Default to allow if no preferences set if notification_type == NotificationType.EMAIL: return preferences.email_enabled elif notification_type == NotificationType.WHATSAPP: return preferences.whatsapp_enabled elif notification_type == NotificationType.PUSH: return preferences.push_enabled return True # Default to allow for unknown types async def _process_template( self, db: AsyncSession, notification: NotificationCreate, template_id: str ) -> NotificationCreate: """Process notification template""" try: # Get template result = await db.execute( select(NotificationTemplate).where( and_( NotificationTemplate.template_key == template_id, NotificationTemplate.is_active == True, NotificationTemplate.type == notification.type ) ) ) template = result.scalar_one_or_none() if not template: logger.warning("Template not found", template_id=template_id) return notification # Process template variables template_data = notification.template_data or {} # Render subject if template.subject_template: subject_template = Template(template.subject_template) notification.subject = subject_template.render(**template_data) # Render body body_template = Template(template.body_template) notification.message = body_template.render(**template_data) # Render HTML if available if template.html_template: html_template = Template(template.html_template) notification.html_content = html_template.render(**template_data) logger.info("Template processed successfully", template_id=template_id) return notification except Exception as e: logger.error("Failed to process template", template_id=template_id, error=str(e)) return notification # Return original if template processing fails async def _send_email(self, notification: NotificationCreate) -> bool: """Send email notification""" try: return await self.email_service.send_email( to_email=notification.recipient_email, subject=notification.subject or "Notification", text_content=notification.message, html_content=notification.html_content ) except Exception as e: logger.error("Failed to send email", error=str(e)) return False async def _send_whatsapp(self, notification: NotificationCreate) -> bool: """Send WhatsApp notification""" try: return await self.whatsapp_service.send_message( to_phone=notification.recipient_phone, message=notification.message ) except Exception as e: logger.error("Failed to send WhatsApp", error=str(e)) return False async def _send_push(self, notification: NotificationCreate) -> bool: """Send push notification (placeholder)""" logger.info("Push notifications not yet implemented") return False