96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
"""
|
|
RabbitMQ messaging client for microservices
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Callable
|
|
import aio_pika
|
|
from aio_pika import connect_robust, Message, DeliveryMode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RabbitMQClient:
|
|
"""RabbitMQ client for microservices communication"""
|
|
|
|
def __init__(self, connection_url: str):
|
|
self.connection_url = connection_url
|
|
self.connection = None
|
|
self.channel = None
|
|
|
|
async def connect(self):
|
|
"""Connect to RabbitMQ"""
|
|
try:
|
|
self.connection = await connect_robust(self.connection_url)
|
|
self.channel = await self.connection.channel()
|
|
logger.info("Connected to RabbitMQ")
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to RabbitMQ: {e}")
|
|
raise
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from RabbitMQ"""
|
|
if self.connection:
|
|
await self.connection.close()
|
|
logger.info("Disconnected from RabbitMQ")
|
|
|
|
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any]):
|
|
"""Publish event to RabbitMQ"""
|
|
try:
|
|
if not self.channel:
|
|
await self.connect()
|
|
|
|
# Declare exchange
|
|
exchange = await self.channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.TOPIC,
|
|
durable=True
|
|
)
|
|
|
|
# Create message
|
|
message = Message(
|
|
json.dumps(event_data).encode(),
|
|
delivery_mode=DeliveryMode.PERSISTENT,
|
|
content_type="application/json"
|
|
)
|
|
|
|
# Publish message
|
|
await exchange.publish(message, routing_key=routing_key)
|
|
|
|
logger.info(f"Published event to {exchange_name} with routing key {routing_key}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish event: {e}")
|
|
raise
|
|
|
|
async def consume_events(self, exchange_name: str, queue_name: str, routing_key: str, callback: Callable):
|
|
"""Consume events from RabbitMQ"""
|
|
try:
|
|
if not self.channel:
|
|
await self.connect()
|
|
|
|
# Declare exchange
|
|
exchange = await self.channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.TOPIC,
|
|
durable=True
|
|
)
|
|
|
|
# Declare queue
|
|
queue = await self.channel.declare_queue(
|
|
queue_name,
|
|
durable=True
|
|
)
|
|
|
|
# Bind queue to exchange
|
|
await queue.bind(exchange, routing_key)
|
|
|
|
# Set up consumer
|
|
await queue.consume(callback)
|
|
|
|
logger.info(f"Started consuming events from {queue_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to consume events: {e}")
|
|
raise |