""" RabbitMQ messaging client for microservices - FIXED VERSION """ import asyncio import json from typing import Dict, Any, Callable, Optional from datetime import datetime, date import uuid import structlog from contextlib import suppress try: import aio_pika from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType AIO_PIKA_AVAILABLE = True except ImportError: AIO_PIKA_AVAILABLE = False logger = structlog.get_logger() class HeartbeatMonitor: """Monitor to ensure heartbeats are processed during heavy operations""" def __init__(self, client): self.client = client self._monitor_task = None self._should_monitor = False async def start_monitoring(self): """Start heartbeat monitoring task""" if self._monitor_task and not self._monitor_task.done(): return self._should_monitor = True self._monitor_task = asyncio.create_task(self._monitor_loop()) async def stop_monitoring(self): """Stop heartbeat monitoring task""" self._should_monitor = False if self._monitor_task and not self._monitor_task.done(): self._monitor_task.cancel() with suppress(asyncio.CancelledError): await self._monitor_task async def _monitor_loop(self): """Monitor loop that periodically yields control for heartbeat processing""" while self._should_monitor: # Yield control to allow heartbeat processing await asyncio.sleep(0.1) # Verify connection is still alive if self.client.connection and not self.client.connection.is_closed: # Check if connection is still responsive try: # This is a lightweight check to ensure the connection is responsive pass # The heartbeat mechanism in aio_pika handles this internally except Exception as e: logger.warning("Connection check failed", error=str(e)) self.client.connected = False break else: logger.warning("Connection is closed, stopping monitor") break def json_serializer(obj): """JSON serializer for objects not serializable by default json code""" if isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, uuid.UUID): return str(obj) elif hasattr(obj, '__class__') and obj.__class__.__name__ == 'Decimal': # Handle Decimal objects from SQLAlchemy without importing decimal return float(obj) raise TypeError(f"Object of type {type(obj)} is not JSON serializable") class RabbitMQClient: """ Universal RabbitMQ client for all microservices Handles all messaging patterns with proper fallbacks """ def __init__(self, connection_url: str, service_name: str = "unknown"): self.connection_url = connection_url self.service_name = service_name self.connection = None self.channel = None self.connected = False self._reconnect_attempts = 0 self._max_reconnect_attempts = 5 self.heartbeat_monitor = HeartbeatMonitor(self) async def connect(self): """Connect to RabbitMQ with retry logic""" if not AIO_PIKA_AVAILABLE: logger.warning("aio-pika not available, messaging disabled", service=self.service_name) return False try: self.connection = await connect_robust( self.connection_url, heartbeat=600 # Increase heartbeat to 600 seconds (10 minutes) to prevent timeouts ) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=100) # Performance optimization self.connected = True self._reconnect_attempts = 0 # Start heartbeat monitoring await self.heartbeat_monitor.start_monitoring() logger.info("Connected to RabbitMQ", service=self.service_name) return True except Exception as e: self.connected = False self._reconnect_attempts += 1 logger.warning( "Failed to connect to RabbitMQ", service=self.service_name, error=str(e), attempt=self._reconnect_attempts ) return False async def disconnect(self): """Disconnect from RabbitMQ with proper channel cleanup""" try: # Stop heartbeat monitoring first await self.heartbeat_monitor.stop_monitoring() # Close channel before connection to avoid "unexpected close" warnings if self.channel and not self.channel.is_closed: await self.channel.close() logger.debug("RabbitMQ channel closed", service=self.service_name) # Then close connection if self.connection and not self.connection.is_closed: await self.connection.close() logger.info("Disconnected from RabbitMQ", service=self.service_name) self.connected = False except Exception as e: logger.warning("Error during RabbitMQ disconnect", service=self.service_name, error=str(e)) self.connected = False async def ensure_connected(self) -> bool: """Ensure connection is active, reconnect if needed""" if self.connected and self.connection and not self.connection.is_closed: return True if self._reconnect_attempts >= self._max_reconnect_attempts: logger.error("Max reconnection attempts reached", service=self.service_name) return False return await self.connect() async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any], persistent: bool = True) -> bool: """ Universal event publisher with automatic fallback Returns True if published successfully, False otherwise """ try: # Ensure we're connected if not await self.ensure_connected(): logger.debug("Event not published - RabbitMQ unavailable", service=self.service_name, routing_key=routing_key) return False # Declare exchange exchange = await self.channel.declare_exchange( exchange_name, ExchangeType.TOPIC, durable=True ) # Prepare message with proper JSON serialization message_body = json.dumps(event_data, default=json_serializer) message = Message( message_body.encode(), delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT, content_type="application/json", timestamp=datetime.now(), headers={ "source_service": self.service_name, "event_id": event_data.get("event_id", str(uuid.uuid4())) } ) # Publish message await exchange.publish(message, routing_key=routing_key) logger.debug("Event published successfully", service=self.service_name, exchange=exchange_name, routing_key=routing_key, size=len(message_body)) return True except Exception as e: logger.error("Failed to publish event", service=self.service_name, exchange=exchange_name, routing_key=routing_key, error=str(e)) self.connected = False # Force reconnection on next attempt return False async def consume_events(self, exchange_name: str, queue_name: str, routing_key: str, callback: Callable) -> bool: """Universal event consumer""" try: if not await self.ensure_connected(): return False # Declare exchange exchange = await self.channel.declare_exchange( exchange_name, ExchangeType.TOPIC, durable=True ) # Declare queue queue = await self.channel.declare_queue( queue_name, durable=True ) # Bind queue to exchange await queue.bind(exchange, routing_key) # Set up consumer await queue.consume(callback) logger.info("Started consuming events", service=self.service_name, queue=queue_name, routing_key=routing_key) return True except Exception as e: logger.error("Failed to start consuming events", service=self.service_name, error=str(e)) return False # High-level convenience methods for common patterns async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool: """Publish user-related events""" return await self.publish_event("user.events", f"user.{event_type}", user_data) async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool: """Publish training-related events""" return await self.publish_event("training.events", f"training.{event_type}", training_data) async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool: """Publish data-related events""" return await self.publish_event("data.events", f"data.{event_type}", data) async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool: """Publish forecast-related events""" return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)