# ================================================================ # services/notification/app/services/email_service.py # ================================================================ """ Email service for sending notifications Handles SMTP configuration and email delivery """ import structlog import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders from email.utils import formataddr from typing import Optional, List, Dict, Any import aiosmtplib from jinja2 import Template import asyncio from app.core.config import settings from shared.monitoring.metrics import MetricsCollector logger = structlog.get_logger() metrics = MetricsCollector("notification-service") class EmailService: """ Email service for sending notifications via SMTP Supports both plain text and HTML emails """ def __init__(self): self.smtp_host = settings.SMTP_HOST self.smtp_port = settings.SMTP_PORT self.smtp_user = settings.SMTP_USER self.smtp_password = settings.SMTP_PASSWORD self.smtp_tls = settings.SMTP_TLS self.smtp_ssl = settings.SMTP_SSL self.default_from_email = settings.DEFAULT_FROM_EMAIL self.default_from_name = settings.DEFAULT_FROM_NAME async def send_email( self, to_email: str, subject: str, text_content: str, html_content: Optional[str] = None, from_email: Optional[str] = None, from_name: Optional[str] = None, reply_to: Optional[str] = None, attachments: Optional[List[Dict[str, Any]]] = None ) -> bool: """ Send an email notification Args: to_email: Recipient email address subject: Email subject text_content: Plain text content html_content: HTML content (optional) from_email: Sender email (optional, uses default) from_name: Sender name (optional, uses default) reply_to: Reply-to address (optional) attachments: List of attachments (optional) Returns: bool: True if email was sent successfully """ try: if not settings.ENABLE_EMAIL_NOTIFICATIONS: logger.info("Email notifications disabled") return True # Return success to avoid blocking workflow if not self.smtp_user or not self.smtp_password: logger.error("SMTP credentials not configured") return False # Validate email address if not to_email or "@" not in to_email: logger.error("Invalid recipient email", email=to_email) return False # Create message message = MIMEMultipart('alternative') message['Subject'] = subject message['To'] = to_email # Set From header sender_email = from_email or self.default_from_email sender_name = from_name or self.default_from_name message['From'] = formataddr((sender_name, sender_email)) # Set Reply-To if provided if reply_to: message['Reply-To'] = reply_to # Add text content text_part = MIMEText(text_content, 'plain', 'utf-8') message.attach(text_part) # Add HTML content if provided if html_content: html_part = MIMEText(html_content, 'html', 'utf-8') message.attach(html_part) # Add attachments if provided if attachments: for attachment in attachments: await self._add_attachment(message, attachment) # Send email await self._send_smtp_email(message, sender_email, to_email) logger.info("Email sent successfully", to=to_email, subject=subject, from_email=sender_email) # Record success metrics metrics.increment_counter("emails_sent_total", labels={"status": "success"}) return True except Exception as e: logger.error("Failed to send email", to=to_email, subject=subject, error=str(e)) # Record failure metrics metrics.increment_counter("emails_sent_total", labels={"status": "failed"}) return False async def send_bulk_emails( self, recipients: List[str], subject: str, text_content: str, html_content: Optional[str] = None, batch_size: int = 50 ) -> Dict[str, Any]: """ Send bulk emails with rate limiting Args: recipients: List of recipient email addresses subject: Email subject text_content: Plain text content html_content: HTML content (optional) batch_size: Number of emails to send per batch Returns: Dict containing success/failure counts """ results = { "total": len(recipients), "sent": 0, "failed": 0, "errors": [] } try: # Process in batches to respect rate limits for i in range(0, len(recipients), batch_size): batch = recipients[i:i + batch_size] # Send emails concurrently within batch tasks = [ self.send_email( to_email=email, subject=subject, text_content=text_content, html_content=html_content ) for email in batch ] batch_results = await asyncio.gather(*tasks, return_exceptions=True) for email, result in zip(batch, batch_results): if isinstance(result, Exception): results["failed"] += 1 results["errors"].append({"email": email, "error": str(result)}) elif result: results["sent"] += 1 else: results["failed"] += 1 results["errors"].append({"email": email, "error": "Unknown error"}) # Rate limiting delay between batches if i + batch_size < len(recipients): await asyncio.sleep(1.0) # 1 second delay between batches logger.info("Bulk email completed", total=results["total"], sent=results["sent"], failed=results["failed"]) return results except Exception as e: logger.error("Bulk email failed", error=str(e)) results["errors"].append({"error": str(e)}) return results async def send_template_email( self, to_email: str, template_name: str, template_data: Dict[str, Any], subject_template: Optional[str] = None ) -> bool: """ Send email using a template Args: to_email: Recipient email address template_name: Name of the email template template_data: Data for template rendering subject_template: Subject template string (optional) Returns: bool: True if email was sent successfully """ try: # Load template (simplified - in production, load from database) template_content = await self._load_email_template(template_name) if not template_content: logger.error("Template not found", template=template_name) return False # Render subject subject = template_name.replace("_", " ").title() if subject_template: subject_tmpl = Template(subject_template) subject = subject_tmpl.render(**template_data) # Render content text_template = Template(template_content.get("text", "")) text_content = text_template.render(**template_data) html_content = None if template_content.get("html"): html_template = Template(template_content["html"]) html_content = html_template.render(**template_data) return await self.send_email( to_email=to_email, subject=subject, text_content=text_content, html_content=html_content ) except Exception as e: logger.error("Failed to send template email", template=template_name, error=str(e)) return False async def health_check(self) -> bool: """ Check if email service is healthy Returns: bool: True if service is healthy """ try: if not settings.ENABLE_EMAIL_NOTIFICATIONS: return True # Service is "healthy" if disabled if not self.smtp_user or not self.smtp_password: logger.warning("SMTP credentials not configured") return False # Test SMTP connection if self.smtp_ssl: # Use implicit TLS/SSL connection (port 465 typically) server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port, use_tls=True) await server.connect() # No need for starttls() when using implicit TLS else: # Use plain connection, optionally upgrade with STARTTLS server = aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port) await server.connect() if self.smtp_tls: # Try STARTTLS, but handle case where connection is already secure try: await server.starttls() except Exception as starttls_error: # If STARTTLS fails because connection is already using TLS, that's okay if "already using TLS" in str(starttls_error) or "already secure" in str(starttls_error): logger.debug("SMTP connection already secure, skipping STARTTLS") else: # Re-raise other STARTTLS errors raise starttls_error await server.login(self.smtp_user, self.smtp_password) await server.quit() logger.info("Email service health check passed") return True except Exception as e: logger.error("Email service health check failed", error=str(e)) return False # ================================================================ # PRIVATE HELPER METHODS # ================================================================ async def _send_smtp_email(self, message: MIMEMultipart, from_email: str, to_email: str): """Send email via SMTP""" try: # Create SMTP connection if self.smtp_ssl: server = aiosmtplib.SMTP( hostname=self.smtp_host, port=self.smtp_port, use_tls=True, timeout=30 ) else: server = aiosmtplib.SMTP( hostname=self.smtp_host, port=self.smtp_port, timeout=30 ) await server.connect() # Start TLS if required if self.smtp_tls and not self.smtp_ssl: await server.starttls() # Login await server.login(self.smtp_user, self.smtp_password) # Send email await server.send_message(message, from_addr=from_email, to_addrs=[to_email]) # Close connection await server.quit() except Exception as e: logger.error("SMTP send failed", error=str(e)) raise async def _add_attachment(self, message: MIMEMultipart, attachment: Dict[str, Any]): """Add attachment to email message""" try: filename = attachment.get("filename", "attachment") content = attachment.get("content", b"") content_type = attachment.get("content_type", "application/octet-stream") # Create attachment part part = MIMEBase(*content_type.split("/")) part.set_payload(content) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {filename}' ) message.attach(part) except Exception as e: logger.error("Failed to add attachment", filename=attachment.get("filename"), error=str(e)) async def _load_email_template(self, template_name: str) -> Optional[Dict[str, str]]: """Load email template from storage""" # Simplified template loading - in production, load from database templates = { "welcome": { "text": """ ¡Bienvenido a Bakery Forecast, {{user_name}}! Gracias por registrarte en nuestra plataforma de pronóstico para panaderías. Tu cuenta ha sido creada exitosamente y ya puedes comenzar a: - Subir datos de ventas - Generar pronósticos de demanda - Optimizar tu producción Para comenzar, visita: {{dashboard_url}} Si tienes alguna pregunta, no dudes en contactarnos. Saludos, El equipo de Bakery Forecast """, "html": """

¡Bienvenido a Bakery Forecast!

Hola {{user_name}},

Gracias por registrarte en nuestra plataforma de pronóstico para panaderías.

Tu cuenta ha sido creada exitosamente y ya puedes comenzar a:

Ir al Dashboard

Si tienes alguna pregunta, no dudes en contactarnos.

Saludos,
El equipo de Bakery Forecast

© 2025 Bakery Forecast. Todos los derechos reservados.
""" }, "forecast_alert": { "text": """ Alerta de Pronóstico - {{bakery_name}} Se ha detectado una variación significativa en la demanda prevista: Producto: {{product_name}} Fecha: {{forecast_date}} Demanda prevista: {{predicted_demand}} unidades Variación: {{variation_percentage}}% {{alert_message}} Revisa los pronósticos en: {{dashboard_url}} Saludos, El equipo de Bakery Forecast """, "html": """

🚨 Alerta de Pronóstico

{{bakery_name}}

Se ha detectado una variación significativa en la demanda prevista:

Producto: {{product_name}}

Fecha: {{forecast_date}}

Demanda prevista: {{predicted_demand}} unidades

Variación: {{variation_percentage}}%

{{alert_message}}

Ver Pronósticos
""" }, "weekly_report": { "text": """ Reporte Semanal - {{bakery_name}} Resumen de la semana del {{week_start}} al {{week_end}}: Ventas Totales: {{total_sales}} unidades Precisión del Pronóstico: {{forecast_accuracy}}% Productos más vendidos: {{#top_products}} - {{name}}: {{quantity}} unidades {{/top_products}} Recomendaciones: {{recommendations}} Ver reporte completo: {{report_url}} Saludos, El equipo de Bakery Forecast """, "html": """

📊 Reporte Semanal

{{bakery_name}}

Semana del {{week_start}} al {{week_end}}

{{total_sales}}

Ventas Totales

{{forecast_accuracy}}%

Precisión

Productos más vendidos:

Recomendaciones:

{{recommendations}}

Ver Reporte Completo
""" } } return templates.get(template_name)