Files
bakery-ia/shared/messaging/rabbitmq.py
2025-09-18 23:32:53 +02:00

201 lines
7.6 KiB
Python

"""
RabbitMQ messaging client for microservices - FIXED VERSION
"""
import asyncio
import json
from typing import Dict, Any, Callable, Optional
from datetime import datetime, date
import uuid
import structlog
try:
import aio_pika
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType
AIO_PIKA_AVAILABLE = True
except ImportError:
AIO_PIKA_AVAILABLE = False
logger = structlog.get_logger()
def json_serializer(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
elif hasattr(obj, '__class__') and obj.__class__.__name__ == 'Decimal':
# Handle Decimal objects from SQLAlchemy without importing decimal
return float(obj)
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
class RabbitMQClient:
"""
Universal RabbitMQ client for all microservices
Handles all messaging patterns with proper fallbacks
"""
def __init__(self, connection_url: str, service_name: str = "unknown"):
self.connection_url = connection_url
self.service_name = service_name
self.connection = None
self.channel = None
self.connected = False
self._reconnect_attempts = 0
self._max_reconnect_attempts = 5
async def connect(self):
"""Connect to RabbitMQ with retry logic"""
if not AIO_PIKA_AVAILABLE:
logger.warning("aio-pika not available, messaging disabled", service=self.service_name)
return False
try:
self.connection = await connect_robust(
self.connection_url,
heartbeat=30,
connection_attempts=3
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=100) # Performance optimization
self.connected = True
self._reconnect_attempts = 0
logger.info("Connected to RabbitMQ", service=self.service_name)
return True
except Exception as e:
self.connected = False
self._reconnect_attempts += 1
logger.warning(
"Failed to connect to RabbitMQ",
service=self.service_name,
error=str(e),
attempt=self._reconnect_attempts
)
return False
async def disconnect(self):
"""Disconnect from RabbitMQ"""
if self.connection and not self.connection.is_closed:
await self.connection.close()
self.connected = False
logger.info("Disconnected from RabbitMQ", service=self.service_name)
async def ensure_connected(self) -> bool:
"""Ensure connection is active, reconnect if needed"""
if self.connected and self.connection and not self.connection.is_closed:
return True
if self._reconnect_attempts >= self._max_reconnect_attempts:
logger.error("Max reconnection attempts reached", service=self.service_name)
return False
return await self.connect()
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any],
persistent: bool = True) -> bool:
"""
Universal event publisher with automatic fallback
Returns True if published successfully, False otherwise
"""
try:
# Ensure we're connected
if not await self.ensure_connected():
logger.debug("Event not published - RabbitMQ unavailable",
service=self.service_name, routing_key=routing_key)
return False
# Declare exchange
exchange = await self.channel.declare_exchange(
exchange_name,
ExchangeType.TOPIC,
durable=True
)
# Prepare message with proper JSON serialization
message_body = json.dumps(event_data, default=json_serializer)
message = Message(
message_body.encode(),
delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
content_type="application/json",
timestamp=datetime.now(),
headers={
"source_service": self.service_name,
"event_id": event_data.get("event_id", str(uuid.uuid4()))
}
)
# Publish message
await exchange.publish(message, routing_key=routing_key)
logger.debug("Event published successfully",
service=self.service_name,
exchange=exchange_name,
routing_key=routing_key,
size=len(message_body))
return True
except Exception as e:
logger.error("Failed to publish event",
service=self.service_name,
exchange=exchange_name,
routing_key=routing_key,
error=str(e))
self.connected = False # Force reconnection on next attempt
return False
async def consume_events(self, exchange_name: str, queue_name: str,
routing_key: str, callback: Callable) -> bool:
"""Universal event consumer"""
try:
if not await self.ensure_connected():
return False
# Declare exchange
exchange = await self.channel.declare_exchange(
exchange_name,
ExchangeType.TOPIC,
durable=True
)
# Declare queue
queue = await self.channel.declare_queue(
queue_name,
durable=True
)
# Bind queue to exchange
await queue.bind(exchange, routing_key)
# Set up consumer
await queue.consume(callback)
logger.info("Started consuming events",
service=self.service_name,
queue=queue_name,
routing_key=routing_key)
return True
except Exception as e:
logger.error("Failed to start consuming events",
service=self.service_name,
error=str(e))
return False
# High-level convenience methods for common patterns
async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool:
"""Publish user-related events"""
return await self.publish_event("user.events", f"user.{event_type}", user_data)
async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool:
"""Publish training-related events"""
return await self.publish_event("training.events", f"training.{event_type}", training_data)
async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool:
"""Publish data-related events"""
return await self.publish_event("data.events", f"data.{event_type}", data)
async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool:
"""Publish forecast-related events"""
return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)