Files
bakery-ia/shared/clients/notification_client.py

187 lines
7.0 KiB
Python
Raw Permalink Normal View History

2025-10-24 13:05:04 +02:00
# shared/clients/notification_client.py
"""
Notification Service Client for Inter-Service Communication
Provides access to notification and email sending from other services
"""
import structlog
from typing import Dict, Any, Optional
from uuid import UUID
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
class NotificationServiceClient(BaseServiceClient):
"""Client for communicating with the Notification Service"""
2025-11-05 13:34:56 +01:00
def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"):
super().__init__(calling_service_name, config)
2025-10-24 13:05:04 +02:00
def get_service_base_path(self) -> str:
2025-11-05 13:34:56 +01:00
return "/api/v1"
2025-10-24 13:05:04 +02:00
# ================================================================
# NOTIFICATION ENDPOINTS
# ================================================================
async def send_notification(
self,
tenant_id: str,
notification_type: str,
message: str,
recipient_email: Optional[str] = None,
subject: Optional[str] = None,
html_content: Optional[str] = None,
priority: str = "normal",
metadata: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
"""
Send a notification
Args:
tenant_id: Tenant ID (UUID as string)
notification_type: Type of notification (email, sms, push, in_app)
message: Notification message
recipient_email: Recipient email address (for email notifications)
subject: Email subject (for email notifications)
html_content: HTML content for email (optional)
priority: Priority level (low, normal, high, urgent)
metadata: Additional metadata
Returns:
Dictionary with notification details
"""
try:
notification_data = {
"type": notification_type,
"message": message,
"priority": priority,
"recipient_email": recipient_email,
"subject": subject,
"html_content": html_content,
"metadata": metadata or {}
}
2025-11-05 13:34:56 +01:00
result = await self.post("notifications/send", data=notification_data, tenant_id=tenant_id)
2025-10-24 13:05:04 +02:00
if result:
logger.info("Notification sent successfully",
tenant_id=tenant_id,
notification_type=notification_type)
return result
except Exception as e:
logger.error("Error sending notification",
error=str(e),
tenant_id=tenant_id,
notification_type=notification_type)
return None
async def send_email(
self,
tenant_id: str,
to_email: str,
subject: str,
message: str,
html_content: Optional[str] = None,
priority: str = "normal"
) -> Optional[Dict[str, Any]]:
"""
Send an email notification (convenience method)
Args:
tenant_id: Tenant ID (UUID as string)
to_email: Recipient email address
subject: Email subject
message: Email message (plain text)
html_content: HTML version of email (optional)
priority: Priority level (low, normal, high, urgent)
Returns:
Dictionary with notification details
"""
return await self.send_notification(
tenant_id=tenant_id,
notification_type="email",
message=message,
recipient_email=to_email,
subject=subject,
html_content=html_content,
priority=priority
)
2025-11-05 13:34:56 +01:00
async def send_workflow_summary(
self,
tenant_id: str,
notification_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Send workflow summary notification
Args:
tenant_id: Tenant ID
notification_data: Summary data to include in notification
Returns:
Dictionary with notification result
"""
try:
# Prepare workflow summary notification
subject = f"Daily Workflow Summary - {notification_data.get('orchestration_run_id', 'N/A')}"
message_parts = [
f"Daily workflow completed for tenant {tenant_id}",
f"Orchestration Run ID: {notification_data.get('orchestration_run_id', 'N/A')}",
f"Forecasts created: {notification_data.get('forecasts_created', 0)}",
f"Production batches created: {notification_data.get('batches_created', 0)}",
f"Procurement requirements created: {notification_data.get('requirements_created', 0)}",
f"Purchase orders created: {notification_data.get('pos_created', 0)}"
]
message = "\n".join(message_parts)
notification_payload = {
"type": "email",
"message": message,
"priority": "normal",
"subject": subject,
"metadata": {
"orchestration_run_id": notification_data.get('orchestration_run_id'),
"forecast_id": notification_data.get('forecast_id'),
"production_schedule_id": notification_data.get('production_schedule_id'),
"procurement_plan_id": notification_data.get('procurement_plan_id'),
"summary_type": "workflow_completion"
}
}
result = await self.post("notifications/send", data=notification_payload, tenant_id=tenant_id)
if result:
logger.info("Workflow summary notification sent successfully",
tenant_id=tenant_id,
orchestration_run_id=notification_data.get('orchestration_run_id'))
return result
except Exception as e:
logger.error("Error sending workflow summary notification",
error=str(e),
tenant_id=tenant_id)
return None
2025-10-24 13:05:04 +02:00
# ================================================================
# UTILITY METHODS
# ================================================================
async def health_check(self) -> bool:
"""Check if notification service is healthy"""
try:
result = await self.get("../health") # Health endpoint is not tenant-scoped
return result is not None
except Exception as e:
logger.error("Notification service health check failed", error=str(e))
return False
# Factory function for dependency injection
def create_notification_client(config: BaseServiceSettings) -> NotificationServiceClient:
"""Create notification service client instance"""
return NotificationServiceClient(config)