2025-07-17 13:09:24 +02:00
|
|
|
"""
|
2025-07-18 14:18:52 +02:00
|
|
|
RabbitMQ messaging client for microservices - FIXED VERSION
|
2025-07-17 13:09:24 +02:00
|
|
|
"""
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
2025-07-18 14:18:52 +02:00
|
|
|
from typing import Dict, Any, Callable, Optional
|
|
|
|
|
from datetime import datetime, date
|
|
|
|
|
import uuid
|
2025-07-18 14:41:39 +02:00
|
|
|
import structlog
|
2025-10-09 14:11:02 +02:00
|
|
|
from contextlib import suppress
|
2025-07-18 14:18:52 +02:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import aio_pika
|
|
|
|
|
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType
|
|
|
|
|
AIO_PIKA_AVAILABLE = True
|
|
|
|
|
except ImportError:
|
|
|
|
|
AIO_PIKA_AVAILABLE = False
|
2025-07-18 14:41:39 +02:00
|
|
|
|
|
|
|
|
logger = structlog.get_logger()
|
2025-07-18 14:18:52 +02:00
|
|
|
|
2025-10-09 14:11:02 +02:00
|
|
|
class HeartbeatMonitor:
|
|
|
|
|
"""Monitor to ensure heartbeats are processed during heavy operations"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, client):
|
|
|
|
|
self.client = client
|
|
|
|
|
self._monitor_task = None
|
|
|
|
|
self._should_monitor = False
|
|
|
|
|
|
|
|
|
|
async def start_monitoring(self):
|
|
|
|
|
"""Start heartbeat monitoring task"""
|
|
|
|
|
if self._monitor_task and not self._monitor_task.done():
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._should_monitor = True
|
|
|
|
|
self._monitor_task = asyncio.create_task(self._monitor_loop())
|
|
|
|
|
|
|
|
|
|
async def stop_monitoring(self):
|
|
|
|
|
"""Stop heartbeat monitoring task"""
|
|
|
|
|
self._should_monitor = False
|
|
|
|
|
if self._monitor_task and not self._monitor_task.done():
|
|
|
|
|
self._monitor_task.cancel()
|
|
|
|
|
with suppress(asyncio.CancelledError):
|
|
|
|
|
await self._monitor_task
|
|
|
|
|
|
|
|
|
|
async def _monitor_loop(self):
|
|
|
|
|
"""Monitor loop that periodically yields control for heartbeat processing"""
|
|
|
|
|
while self._should_monitor:
|
|
|
|
|
# Yield control to allow heartbeat processing
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
# Verify connection is still alive
|
|
|
|
|
if self.client.connection and not self.client.connection.is_closed:
|
|
|
|
|
# Check if connection is still responsive
|
|
|
|
|
try:
|
|
|
|
|
# This is a lightweight check to ensure the connection is responsive
|
|
|
|
|
pass # The heartbeat mechanism in aio_pika handles this internally
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning("Connection check failed", error=str(e))
|
|
|
|
|
self.client.connected = False
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
logger.warning("Connection is closed, stopping monitor")
|
|
|
|
|
break
|
|
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
def json_serializer(obj):
|
|
|
|
|
"""JSON serializer for objects not serializable by default json code"""
|
|
|
|
|
if isinstance(obj, (datetime, date)):
|
|
|
|
|
return obj.isoformat()
|
|
|
|
|
elif isinstance(obj, uuid.UUID):
|
|
|
|
|
return str(obj)
|
2025-09-18 23:32:53 +02:00
|
|
|
elif hasattr(obj, '__class__') and obj.__class__.__name__ == 'Decimal':
|
|
|
|
|
# Handle Decimal objects from SQLAlchemy without importing decimal
|
|
|
|
|
return float(obj)
|
2025-07-18 14:18:52 +02:00
|
|
|
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
class RabbitMQClient:
|
2025-07-18 14:18:52 +02:00
|
|
|
"""
|
|
|
|
|
Universal RabbitMQ client for all microservices
|
|
|
|
|
Handles all messaging patterns with proper fallbacks
|
|
|
|
|
"""
|
2025-07-17 13:09:24 +02:00
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
def __init__(self, connection_url: str, service_name: str = "unknown"):
|
2025-07-17 13:09:24 +02:00
|
|
|
self.connection_url = connection_url
|
2025-07-18 14:18:52 +02:00
|
|
|
self.service_name = service_name
|
2025-07-17 13:09:24 +02:00
|
|
|
self.connection = None
|
|
|
|
|
self.channel = None
|
2025-07-18 14:18:52 +02:00
|
|
|
self.connected = False
|
|
|
|
|
self._reconnect_attempts = 0
|
|
|
|
|
self._max_reconnect_attempts = 5
|
2025-10-09 14:11:02 +02:00
|
|
|
self.heartbeat_monitor = HeartbeatMonitor(self)
|
2025-07-18 14:18:52 +02:00
|
|
|
|
2025-07-17 13:09:24 +02:00
|
|
|
async def connect(self):
|
2025-07-18 14:18:52 +02:00
|
|
|
"""Connect to RabbitMQ with retry logic"""
|
|
|
|
|
if not AIO_PIKA_AVAILABLE:
|
|
|
|
|
logger.warning("aio-pika not available, messaging disabled", service=self.service_name)
|
|
|
|
|
return False
|
|
|
|
|
|
2025-07-17 13:09:24 +02:00
|
|
|
try:
|
2025-07-18 14:18:52 +02:00
|
|
|
self.connection = await connect_robust(
|
|
|
|
|
self.connection_url,
|
2025-10-09 14:11:02 +02:00
|
|
|
heartbeat=600 # Increase heartbeat to 600 seconds (10 minutes) to prevent timeouts
|
2025-07-18 14:18:52 +02:00
|
|
|
)
|
2025-07-17 13:09:24 +02:00
|
|
|
self.channel = await self.connection.channel()
|
2025-07-18 14:18:52 +02:00
|
|
|
await self.channel.set_qos(prefetch_count=100) # Performance optimization
|
|
|
|
|
|
|
|
|
|
self.connected = True
|
|
|
|
|
self._reconnect_attempts = 0
|
2025-10-09 14:11:02 +02:00
|
|
|
|
|
|
|
|
# Start heartbeat monitoring
|
|
|
|
|
await self.heartbeat_monitor.start_monitoring()
|
|
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
logger.info("Connected to RabbitMQ", service=self.service_name)
|
|
|
|
|
return True
|
|
|
|
|
|
2025-07-17 13:09:24 +02:00
|
|
|
except Exception as e:
|
2025-07-18 14:18:52 +02:00
|
|
|
self.connected = False
|
|
|
|
|
self._reconnect_attempts += 1
|
|
|
|
|
logger.warning(
|
|
|
|
|
"Failed to connect to RabbitMQ",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
error=str(e),
|
|
|
|
|
attempt=self._reconnect_attempts
|
|
|
|
|
)
|
|
|
|
|
return False
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
async def disconnect(self):
|
2025-10-09 14:11:02 +02:00
|
|
|
"""Disconnect from RabbitMQ with proper channel cleanup"""
|
|
|
|
|
try:
|
|
|
|
|
# Stop heartbeat monitoring first
|
|
|
|
|
await self.heartbeat_monitor.stop_monitoring()
|
|
|
|
|
|
|
|
|
|
# Close channel before connection to avoid "unexpected close" warnings
|
|
|
|
|
if self.channel and not self.channel.is_closed:
|
|
|
|
|
await self.channel.close()
|
|
|
|
|
logger.debug("RabbitMQ channel closed", service=self.service_name)
|
|
|
|
|
|
|
|
|
|
# Then close connection
|
|
|
|
|
if self.connection and not self.connection.is_closed:
|
|
|
|
|
await self.connection.close()
|
|
|
|
|
logger.info("Disconnected from RabbitMQ", service=self.service_name)
|
|
|
|
|
|
|
|
|
|
self.connected = False
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning("Error during RabbitMQ disconnect",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
error=str(e))
|
2025-07-18 14:18:52 +02:00
|
|
|
self.connected = False
|
|
|
|
|
|
|
|
|
|
async def ensure_connected(self) -> bool:
|
|
|
|
|
"""Ensure connection is active, reconnect if needed"""
|
|
|
|
|
if self.connected and self.connection and not self.connection.is_closed:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if self._reconnect_attempts >= self._max_reconnect_attempts:
|
|
|
|
|
logger.error("Max reconnection attempts reached", service=self.service_name)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
return await self.connect()
|
2025-07-17 13:09:24 +02:00
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any],
|
|
|
|
|
persistent: bool = True) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Universal event publisher with automatic fallback
|
|
|
|
|
Returns True if published successfully, False otherwise
|
|
|
|
|
"""
|
2025-07-17 13:09:24 +02:00
|
|
|
try:
|
2025-07-18 14:18:52 +02:00
|
|
|
# Ensure we're connected
|
|
|
|
|
if not await self.ensure_connected():
|
|
|
|
|
logger.debug("Event not published - RabbitMQ unavailable",
|
|
|
|
|
service=self.service_name, routing_key=routing_key)
|
|
|
|
|
return False
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
# Declare exchange
|
|
|
|
|
exchange = await self.channel.declare_exchange(
|
|
|
|
|
exchange_name,
|
2025-07-18 14:18:52 +02:00
|
|
|
ExchangeType.TOPIC,
|
2025-07-17 13:09:24 +02:00
|
|
|
durable=True
|
|
|
|
|
)
|
|
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
# Prepare message with proper JSON serialization
|
|
|
|
|
message_body = json.dumps(event_data, default=json_serializer)
|
2025-07-17 13:09:24 +02:00
|
|
|
message = Message(
|
2025-07-18 14:18:52 +02:00
|
|
|
message_body.encode(),
|
|
|
|
|
delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
|
|
|
|
|
content_type="application/json",
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
headers={
|
|
|
|
|
"source_service": self.service_name,
|
|
|
|
|
"event_id": event_data.get("event_id", str(uuid.uuid4()))
|
|
|
|
|
}
|
2025-07-17 13:09:24 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Publish message
|
|
|
|
|
await exchange.publish(message, routing_key=routing_key)
|
|
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
logger.debug("Event published successfully",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
exchange=exchange_name,
|
|
|
|
|
routing_key=routing_key,
|
|
|
|
|
size=len(message_body))
|
|
|
|
|
return True
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-07-18 14:18:52 +02:00
|
|
|
logger.error("Failed to publish event",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
exchange=exchange_name,
|
|
|
|
|
routing_key=routing_key,
|
|
|
|
|
error=str(e))
|
|
|
|
|
self.connected = False # Force reconnection on next attempt
|
|
|
|
|
return False
|
2025-07-17 13:09:24 +02:00
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
async def consume_events(self, exchange_name: str, queue_name: str,
|
|
|
|
|
routing_key: str, callback: Callable) -> bool:
|
|
|
|
|
"""Universal event consumer"""
|
2025-07-17 13:09:24 +02:00
|
|
|
try:
|
2025-07-18 14:18:52 +02:00
|
|
|
if not await self.ensure_connected():
|
|
|
|
|
return False
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
# Declare exchange
|
|
|
|
|
exchange = await self.channel.declare_exchange(
|
|
|
|
|
exchange_name,
|
2025-07-18 14:18:52 +02:00
|
|
|
ExchangeType.TOPIC,
|
2025-07-17 13:09:24 +02:00
|
|
|
durable=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Declare queue
|
|
|
|
|
queue = await self.channel.declare_queue(
|
|
|
|
|
queue_name,
|
|
|
|
|
durable=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Bind queue to exchange
|
|
|
|
|
await queue.bind(exchange, routing_key)
|
|
|
|
|
|
|
|
|
|
# Set up consumer
|
|
|
|
|
await queue.consume(callback)
|
|
|
|
|
|
2025-07-18 14:18:52 +02:00
|
|
|
logger.info("Started consuming events",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
queue=queue_name,
|
|
|
|
|
routing_key=routing_key)
|
|
|
|
|
return True
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-07-18 14:18:52 +02:00
|
|
|
logger.error("Failed to start consuming events",
|
|
|
|
|
service=self.service_name,
|
|
|
|
|
error=str(e))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# High-level convenience methods for common patterns
|
|
|
|
|
async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool:
|
|
|
|
|
"""Publish user-related events"""
|
|
|
|
|
return await self.publish_event("user.events", f"user.{event_type}", user_data)
|
|
|
|
|
|
|
|
|
|
async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool:
|
|
|
|
|
"""Publish training-related events"""
|
|
|
|
|
return await self.publish_event("training.events", f"training.{event_type}", training_data)
|
|
|
|
|
|
|
|
|
|
async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool:
|
|
|
|
|
"""Publish data-related events"""
|
|
|
|
|
return await self.publish_event("data.events", f"data.{event_type}", data)
|
|
|
|
|
|
|
|
|
|
async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool:
|
|
|
|
|
"""Publish forecast-related events"""
|
|
|
|
|
return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)
|