Files
bakery-ia/shared/messaging/messaging_client.py
2025-12-13 23:57:54 +01:00

642 lines
24 KiB
Python
Executable File

"""
Unified RabbitMQ Client and Publisher for Bakery-IA Services
This module provides a standardized approach for all services to connect to RabbitMQ,
publish messages, and handle messaging lifecycle. It combines all messaging
functionality into a single, unified interface.
"""
import asyncio
import json
from typing import Dict, Any, Callable, Optional, Union
from datetime import datetime, date, timezone
import uuid
import structlog
from contextlib import suppress
from enum import Enum
try:
import aio_pika
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType
AIO_PIKA_AVAILABLE = True
except ImportError:
AIO_PIKA_AVAILABLE = False
logger = structlog.get_logger()
class EventType(Enum):
"""Event type enum for consistent event classification"""
BUSINESS = "business" # Business events like inventory changes, user actions
ALERT = "alert" # User-facing alerts requiring action
NOTIFICATION = "notification" # User-facing informational notifications
RECOMMENDATION = "recommendation" # User-facing recommendations
SYSTEM = "system" # System-level events
class EVENT_TYPES:
"""Static class for event type constants"""
class INVENTORY:
INGREDIENT_CREATED = "inventory.ingredient.created"
STOCK_ADDED = "inventory.stock.added"
STOCK_CONSUMED = "inventory.stock.consumed"
LOW_STOCK_ALERT = "inventory.alert.low_stock"
EXPIRATION_ALERT = "inventory.alert.expiration"
STOCK_UPDATED = "inventory.stock.updated"
STOCK_TRANSFERRED = "inventory.stock.transferred"
STOCK_WASTED = "inventory.stock.wasted"
class PRODUCTION:
BATCH_CREATED = "production.batch.created"
BATCH_STARTED = "production.batch.started"
BATCH_COMPLETED = "production.batch.completed"
EQUIPMENT_STATUS_CHANGED = "production.equipment.status_changed"
class PROCUREMENT:
PO_CREATED = "procurement.po.created"
PO_APPROVED = "procurement.po.approved"
PO_REJECTED = "procurement.po.rejected"
DELIVERY_SCHEDULED = "procurement.delivery.scheduled"
DELIVERY_RECEIVED = "procurement.delivery.received"
DELIVERY_OVERDUE = "procurement.delivery.overdue"
class FORECASTING:
FORECAST_GENERATED = "forecasting.forecast.generated"
FORECAST_UPDATED = "forecasting.forecast.updated"
DEMAND_SPIKE_DETECTED = "forecasting.demand.spike_detected"
WEATHER_IMPACT_FORECAST = "forecasting.weather.impact_forecast"
class NOTIFICATION:
NOTIFICATION_SENT = "notification.sent"
NOTIFICATION_FAILED = "notification.failed"
NOTIFICATION_DELIVERED = "notification.delivered"
NOTIFICATION_OPENED = "notification.opened"
class TENANT:
TENANT_CREATED = "tenant.created"
TENANT_UPDATED = "tenant.updated"
TENANT_DELETED = "tenant.deleted"
TENANT_MEMBER_ADDED = "tenant.member.added"
TENANT_MEMBER_REMOVED = "tenant.member.removed"
def json_serializer(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
elif hasattr(obj, '__class__') and obj.__class__.__name__ == 'Decimal':
# Handle Decimal objects from SQLAlchemy without importing decimal
return float(obj)
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
class RabbitMQHeartbeatMonitor:
"""Monitor to ensure heartbeats are processed during heavy operations"""
def __init__(self, client):
self.client = client
self._monitor_task = None
self._should_monitor = False
async def start_monitoring(self):
"""Start heartbeat monitoring task"""
if self._monitor_task and not self._monitor_task.done():
return
self._should_monitor = True
self._monitor_task = asyncio.create_task(self._monitor_loop())
async def stop_monitoring(self):
"""Stop heartbeat monitoring task"""
self._should_monitor = False
if self._monitor_task and not self._monitor_task.done():
self._monitor_task.cancel()
with suppress(asyncio.CancelledError):
await self._monitor_task
async def _monitor_loop(self):
"""Monitor loop that periodically yields control for heartbeat processing"""
while self._should_monitor:
# Yield control to allow heartbeat processing
await asyncio.sleep(0.1)
# Verify connection is still alive
if self.client.connection and not self.client.connection.is_closed:
# Check if connection is still responsive
try:
# This is a lightweight check to ensure the connection is responsive
pass # The heartbeat mechanism in aio_pika handles this internally
except Exception as e:
logger.warning("Connection check failed", error=str(e))
self.client.connected = False
break
else:
logger.warning("Connection is closed, stopping monitor")
break
class RabbitMQClient:
"""
Universal RabbitMQ client for all bakery-ia microservices
Handles all messaging patterns with proper fallbacks
"""
def __init__(self, connection_url: str, service_name: str = "unknown"):
self.connection_url = connection_url
self.service_name = service_name
self.connection = None
self.channel = None
self.connected = False
self._reconnect_attempts = 0
self._max_reconnect_attempts = 5
self.heartbeat_monitor = RabbitMQHeartbeatMonitor(self)
async def connect(self):
"""Connect to RabbitMQ with retry logic"""
if not AIO_PIKA_AVAILABLE:
logger.warning("aio-pika not available, messaging disabled", service=self.service_name)
return False
try:
self.connection = await connect_robust(
self.connection_url,
heartbeat=600 # Increase heartbeat to 600 seconds (10 minutes) to prevent timeouts
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=100) # Performance optimization
self.connected = True
self._reconnect_attempts = 0
# Start heartbeat monitoring
await self.heartbeat_monitor.start_monitoring()
logger.info("Connected to RabbitMQ", service=self.service_name)
return True
except Exception as e:
self.connected = False
self._reconnect_attempts += 1
logger.warning(
"Failed to connect to RabbitMQ",
service=self.service_name,
error=str(e),
attempt=self._reconnect_attempts
)
return False
async def disconnect(self):
"""Disconnect from RabbitMQ with proper channel cleanup"""
try:
# Stop heartbeat monitoring first
await self.heartbeat_monitor.stop_monitoring()
# Close channel before connection to avoid "unexpected close" warnings
if self.channel and not self.channel.is_closed:
await self.channel.close()
logger.debug("RabbitMQ channel closed", service=self.service_name)
# Then close connection
if self.connection and not self.connection.is_closed:
await self.connection.close()
logger.info("Disconnected from RabbitMQ", service=self.service_name)
self.connected = False
except Exception as e:
logger.warning("Error during RabbitMQ disconnect",
service=self.service_name,
error=str(e))
self.connected = False
async def ensure_connected(self) -> bool:
"""Ensure connection is active, reconnect if needed"""
if self.connected and self.connection and not self.connection.is_closed:
return True
if self._reconnect_attempts >= self._max_reconnect_attempts:
logger.error("Max reconnection attempts reached", service=self.service_name)
return False
return await self.connect()
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any],
persistent: bool = True) -> bool:
"""
Universal event publisher with automatic fallback
Returns True if published successfully, False otherwise
"""
try:
# Ensure we're connected
if not await self.ensure_connected():
logger.debug("Event not published - RabbitMQ unavailable",
service=self.service_name, routing_key=routing_key)
return False
# Declare exchange
exchange = await self.channel.declare_exchange(
exchange_name,
ExchangeType.TOPIC,
durable=True
)
# Prepare message with proper JSON serialization
message_body = json.dumps(event_data, default=json_serializer)
message = Message(
message_body.encode(),
delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
content_type="application/json",
timestamp=datetime.now(),
headers={
"source_service": self.service_name,
"event_id": event_data.get("event_id", str(uuid.uuid4()))
}
)
# Publish message
await exchange.publish(message, routing_key=routing_key)
logger.debug("Event published successfully",
service=self.service_name,
exchange=exchange_name,
routing_key=routing_key,
size=len(message_body))
return True
except Exception as e:
logger.error("Failed to publish event",
service=self.service_name,
exchange=exchange_name,
routing_key=routing_key,
error=str(e))
self.connected = False # Force reconnection on next attempt
return False
async def consume_events(self, exchange_name: str, queue_name: str,
routing_key: str, callback: Callable) -> bool:
"""Universal event consumer"""
try:
if not await self.ensure_connected():
return False
# Declare exchange
exchange = await self.channel.declare_exchange(
exchange_name,
ExchangeType.TOPIC,
durable=True
)
# Declare queue
queue = await self.channel.declare_queue(
queue_name,
durable=True
)
# Bind queue to exchange
await queue.bind(exchange, routing_key)
# Set up consumer
await queue.consume(callback)
logger.info("Started consuming events",
service=self.service_name,
queue=queue_name,
routing_key=routing_key)
return True
except Exception as e:
logger.error("Failed to start consuming events",
service=self.service_name,
error=str(e))
return False
# High-level convenience methods for common patterns
async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool:
"""Publish user-related events"""
return await self.publish_event("user.events", f"user.{event_type}", user_data)
async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool:
"""Publish training-related events"""
return await self.publish_event("training.events", f"training.{event_type}", training_data)
async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool:
"""Publish data-related events"""
return await self.publish_event("data.events", f"data.{event_type}", data)
async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool:
"""Publish forecast-related events"""
return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)
class EventMessage:
"""Standardized event message structure"""
def __init__(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
service_name: str,
data: Dict[str, Any],
event_class: str = "business", # business, alert, notification, recommendation
correlation_id: Optional[str] = None,
trace_id: Optional[str] = None,
severity: Optional[str] = None, # For alerts: urgent, high, medium, low
source: Optional[str] = None
):
self.event_type = event_type
self.tenant_id = str(tenant_id) if isinstance(tenant_id, uuid.UUID) else tenant_id
self.service_name = service_name
self.data = data
self.event_class = event_class
self.correlation_id = correlation_id or str(uuid.uuid4())
self.trace_id = trace_id or str(uuid.uuid4())
self.severity = severity
self.source = source or service_name
self.timestamp = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for message publishing"""
result = {
"event_type": self.event_type,
"tenant_id": self.tenant_id,
"service_name": self.service_name,
"data": self.data,
"event_class": self.event_class,
"correlation_id": self.correlation_id,
"trace_id": self.trace_id,
"timestamp": self.timestamp
}
if self.severity:
result["severity"] = self.severity
if self.source:
result["source"] = self.source
return result
class UnifiedEventPublisher:
"""Unified publisher for all event types - business events, alerts, notifications, recommendations"""
def __init__(self, rabbitmq_client: RabbitMQClient, service_name: str):
self.rabbitmq = rabbitmq_client
self.service_name = service_name
self.exchange = "events.exchange"
async def publish_event(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
data: Dict[str, Any],
event_class: str = "business",
severity: Optional[str] = None
) -> bool:
"""
Publish a standardized event using the unified messaging pattern.
Args:
event_type: Type of event (e.g., 'inventory.ingredient.created')
tenant_id: Tenant identifier
data: Event payload data
event_class: One of 'business', 'alert', 'notification', 'recommendation'
severity: Alert severity (for alert events only)
"""
# Determine event domain and event type separately for alert processor
# The event_type should be just the specific event name, domain should be extracted separately
if '.' in event_type and event_class in ["alert", "notification", "recommendation"]:
# For events like "inventory.critical_stock_shortage", split into domain and event
parts = event_type.split('.', 1) # Split only on first dot
event_domain = parts[0]
actual_event_type = parts[1]
else:
# For simple event types or business events, use as-is
event_domain = "general" if event_class == "business" else self.service_name
actual_event_type = event_type
# For the message payload that goes to alert processor, use the expected MinimalEvent format
if event_class in ["alert", "notification", "recommendation"]:
# Format for alert processor (uses MinimalEvent schema)
event_payload = {
"tenant_id": str(tenant_id),
"event_class": event_class,
"event_domain": event_domain,
"event_type": actual_event_type, # Just the specific event name, not domain.event_name
"service": self.service_name, # Changed from service_name to service
"metadata": data, # Changed from data to metadata
"timestamp": datetime.now(timezone.utc).isoformat()
}
if severity:
event_payload["severity"] = severity # Include severity for alerts
else:
# Format for business events (standard format)
event_payload = {
"event_type": event_type,
"tenant_id": str(tenant_id),
"service_name": self.service_name,
"data": data,
"event_class": event_class,
"timestamp": datetime.now(timezone.utc).isoformat()
}
if severity:
event_payload["severity"] = severity
# Determine routing key based on event class
# For routing, we can still use the original event_type format since it's for routing purposes
if event_class == "alert":
routing_key = f"alert.{event_domain}.{severity or 'medium'}"
elif event_class == "notification":
routing_key = f"notification.{event_domain}.info"
elif event_class == "recommendation":
routing_key = f"recommendation.{event_domain}.medium"
else: # business events
routing_key = f"business.{event_type.replace('.', '_')}"
try:
success = await self.rabbitmq.publish_event(
exchange_name=self.exchange,
routing_key=routing_key,
event_data=event_payload
)
if success:
logger.info(
"event_published",
tenant_id=str(tenant_id),
event_type=event_type,
event_class=event_class,
severity=severity
)
else:
logger.error(
"event_publish_failed",
tenant_id=str(tenant_id),
event_type=event_type
)
return success
except Exception as e:
logger.error(
"event_publish_error",
tenant_id=str(tenant_id),
event_type=event_type,
error=str(e)
)
return False
# Business event methods
async def publish_business_event(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
data: Dict[str, Any]
) -> bool:
"""Publish a business event (inventory changes, user actions, etc.)"""
return await self.publish_event(
event_type=event_type,
tenant_id=tenant_id,
data=data,
event_class="business"
)
# Alert methods
async def publish_alert(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
severity: str, # urgent, high, medium, low
data: Dict[str, Any]
) -> bool:
"""Publish an alert (actionable by user)"""
return await self.publish_event(
event_type=event_type,
tenant_id=tenant_id,
data=data,
event_class="alert",
severity=severity
)
# Notification methods
async def publish_notification(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
data: Dict[str, Any]
) -> bool:
"""Publish a notification (informational to user)"""
return await self.publish_event(
event_type=event_type,
tenant_id=tenant_id,
data=data,
event_class="notification"
)
# Recommendation methods
async def publish_recommendation(
self,
event_type: str,
tenant_id: Union[str, uuid.UUID],
data: Dict[str, Any]
) -> bool:
"""Publish a recommendation (suggestion to user)"""
return await self.publish_event(
event_type=event_type,
tenant_id=tenant_id,
data=data,
event_class="recommendation"
)
class ServiceMessagingManager:
"""Manager class to handle messaging lifecycle for services"""
def __init__(self, service_name: str, rabbitmq_url: str):
self.service_name = service_name
self.rabbitmq_url = rabbitmq_url
self.rabbitmq_client = None
self.publisher = None
async def setup(self):
"""Setup the messaging system for the service"""
try:
self.rabbitmq_client = RabbitMQClient(self.rabbitmq_url, self.service_name)
success = await self.rabbitmq_client.connect()
if success:
self.publisher = UnifiedEventPublisher(self.rabbitmq_client, self.service_name)
logger.info(f"{self.service_name} messaging manager setup completed")
return True
else:
logger.error(f"{self.service_name} messaging manager setup failed")
return False
except Exception as e:
logger.error(f"Error during {self.service_name} messaging manager setup", error=str(e))
return False
async def cleanup(self):
"""Cleanup the messaging system for the service"""
try:
if self.rabbitmq_client:
await self.rabbitmq_client.disconnect()
logger.info(f"{self.service_name} messaging manager cleanup completed")
return True
return True # If no client to clean up, consider it successful
except Exception as e:
logger.error(f"Error during {self.service_name} messaging manager cleanup", error=str(e))
return False
@property
def is_ready(self):
"""Check if the messaging system is ready for use"""
return (self.publisher is not None and
self.rabbitmq_client is not None and
self.rabbitmq_client.connected)
# Utility functions for easy service integration
async def initialize_service_publisher(service_name: str, rabbitmq_url: str):
"""
Initialize a service-specific publisher using the unified messaging system.
Args:
service_name: Name of the service (e.g., 'notification-service', 'forecasting-service')
rabbitmq_url: RabbitMQ connection URL
Returns:
UnifiedEventPublisher instance or None if initialization failed
"""
try:
rabbitmq_client = RabbitMQClient(rabbitmq_url, service_name)
success = await rabbitmq_client.connect()
if success:
publisher = UnifiedEventPublisher(rabbitmq_client, service_name)
logger.info(f"{service_name} unified messaging publisher initialized")
return rabbitmq_client, publisher
else:
logger.warning(f"{service_name} unified messaging publisher failed to connect")
return None, None
except Exception as e:
logger.error(f"Failed to initialize {service_name} unified messaging publisher", error=str(e))
return None, None
async def cleanup_service_publisher(rabbitmq_client):
"""
Cleanup messaging for a service.
Args:
rabbitmq_client: The RabbitMQ client to disconnect
Returns:
True if cleanup was successful, False otherwise
"""
try:
if rabbitmq_client:
await rabbitmq_client.disconnect()
logger.info("Service messaging cleanup completed")
return True
return True # If no client to clean up, consider it successful
except Exception as e:
logger.error("Error during service messaging cleanup", error=str(e))
return False