Fix bugs issues
This commit is contained in:
@@ -16,8 +16,7 @@ from app.core.config import settings
|
|||||||
from app.models.users import User, UserSession
|
from app.models.users import User, UserSession
|
||||||
from app.schemas.auth import UserRegistration, UserLogin, TokenResponse, UserResponse
|
from app.schemas.auth import UserRegistration, UserLogin, TokenResponse, UserResponse
|
||||||
from app.core.security import security_manager
|
from app.core.security import security_manager
|
||||||
from app.services.messaging import message_publisher
|
from app.services.messaging import publish_user_registered, publish_user_login
|
||||||
from shared.messaging.events import UserRegisteredEvent, UserLoginEvent
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -58,20 +57,18 @@ class AuthService:
|
|||||||
await db.commit()
|
await db.commit()
|
||||||
await db.refresh(user)
|
await db.refresh(user)
|
||||||
|
|
||||||
# Publish user registered event
|
# Publish user registered event - SIMPLIFIED
|
||||||
await message_publisher.publish_event(
|
event_data = {
|
||||||
"user_events",
|
|
||||||
"user.registered",
|
|
||||||
UserRegisteredEvent(
|
|
||||||
service_name="auth-service",
|
|
||||||
data={
|
|
||||||
"user_id": str(user.id),
|
"user_id": str(user.id),
|
||||||
"email": user.email,
|
"email": user.email,
|
||||||
"full_name": user.full_name,
|
"full_name": user.full_name,
|
||||||
"language": user.language
|
"language": user.language,
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||||
}
|
}
|
||||||
).__dict__
|
|
||||||
)
|
success = await publish_user_registered(event_data)
|
||||||
|
if not success:
|
||||||
|
logger.warning("Failed to publish user registered event", user_id=str(user.id))
|
||||||
|
|
||||||
logger.info(f"User registered: {user.email}")
|
logger.info(f"User registered: {user.email}")
|
||||||
return UserResponse(**user.to_dict())
|
return UserResponse(**user.to_dict())
|
||||||
@@ -144,22 +141,20 @@ class AuthService:
|
|||||||
db.add(session)
|
db.add(session)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
|
||||||
# Publish login event
|
# Publish login event - SIMPLIFIED
|
||||||
await message_publisher.publish_event(
|
event_data = {
|
||||||
"user_events",
|
|
||||||
"user.login",
|
|
||||||
UserLoginEvent(
|
|
||||||
service_name="auth-service",
|
|
||||||
data={
|
|
||||||
"user_id": str(user.id),
|
"user_id": str(user.id),
|
||||||
"email": user.email,
|
"email": user.email,
|
||||||
"ip_address": ip_address,
|
"ip_address": ip_address,
|
||||||
"user_agent": user_agent
|
"user_agent": user_agent,
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||||
}
|
}
|
||||||
).__dict__
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"User login successful: {user.email}")
|
success = await publish_user_login(event_data)
|
||||||
|
if not success:
|
||||||
|
logger.warning("Failed to publish login event", user_id=str(user.id))
|
||||||
|
|
||||||
|
logger.info("User login successful", user_id=str(user.id), email=user.email)
|
||||||
|
|
||||||
return TokenResponse(
|
return TokenResponse(
|
||||||
access_token=access_token,
|
access_token=access_token,
|
||||||
|
|||||||
@@ -2,24 +2,37 @@
|
|||||||
"""
|
"""
|
||||||
Messaging service for auth service
|
Messaging service for auth service
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from shared.messaging.rabbitmq import RabbitMQClient
|
from shared.messaging.rabbitmq import RabbitMQClient
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Global message publisher
|
# Single global instance
|
||||||
message_publisher = RabbitMQClient(settings.RABBITMQ_URL)
|
auth_publisher = RabbitMQClient(settings.RABBITMQ_URL, "auth-service")
|
||||||
|
|
||||||
async def setup_messaging():
|
async def setup_messaging():
|
||||||
"""Establishes connection to RabbitMQ for the message publisher."""
|
"""Initialize messaging for auth service"""
|
||||||
logger.info("Attempting to connect to RabbitMQ...")
|
success = await auth_publisher.connect()
|
||||||
await message_publisher.connect()
|
if success:
|
||||||
logger.info("RabbitMQ connection established.")
|
logger.info("Auth service messaging initialized")
|
||||||
|
else:
|
||||||
|
logger.warning("Auth service messaging failed to initialize")
|
||||||
|
|
||||||
async def cleanup_messaging():
|
async def cleanup_messaging():
|
||||||
"""Closes the connection to RabbitMQ for the message publisher."""
|
"""Cleanup messaging for auth service"""
|
||||||
logger.info("Attempting to disconnect from RabbitMQ...")
|
await auth_publisher.disconnect()
|
||||||
await message_publisher.disconnect()
|
logger.info("Auth service messaging cleaned up")
|
||||||
logger.info("RabbitMQ connection closed.")
|
|
||||||
|
# Convenience functions for auth-specific events
|
||||||
|
async def publish_user_registered(user_data: dict) -> bool:
|
||||||
|
"""Publish user registered event"""
|
||||||
|
return await auth_publisher.publish_user_event("registered", user_data)
|
||||||
|
|
||||||
|
async def publish_user_login(user_data: dict) -> bool:
|
||||||
|
"""Publish user login event"""
|
||||||
|
return await auth_publisher.publish_user_event("login", user_data)
|
||||||
|
|
||||||
|
async def publish_user_logout(user_data: dict) -> bool:
|
||||||
|
"""Publish user logout event"""
|
||||||
|
return await auth_publisher.publish_user_event("logout", user_data)
|
||||||
|
|||||||
@@ -1,168 +1,42 @@
|
|||||||
# ================================================================
|
# ================================================================
|
||||||
# services/data/app/services/messaging.py
|
# services/data/app/services/messaging.py
|
||||||
# ================================================================
|
# ================================================================
|
||||||
"""Message queue service for data events"""
|
|
||||||
|
|
||||||
import json
|
|
||||||
import asyncio
|
|
||||||
from typing import Dict, Any, Optional
|
|
||||||
import structlog
|
|
||||||
|
|
||||||
try:
|
|
||||||
from aio_pika import connect_robust, Message, ExchangeType
|
|
||||||
AIO_PIKA_AVAILABLE = True
|
|
||||||
except ImportError:
|
|
||||||
AIO_PIKA_AVAILABLE = False
|
|
||||||
|
|
||||||
|
from shared.messaging.rabbitmq import RabbitMQClient
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
import logging
|
||||||
|
|
||||||
logger = structlog.get_logger()
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DataEventPublisher:
|
# Single global instance
|
||||||
"""
|
data_publisher = RabbitMQClient(settings.RABBITMQ_URL, "data-service")
|
||||||
Event publisher for data service events.
|
|
||||||
Falls back gracefully if RabbitMQ is not available.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
async def setup_messaging():
|
||||||
self.connection = None
|
"""Initialize messaging for data service"""
|
||||||
self.channel = None
|
success = await data_publisher.connect()
|
||||||
self.exchange = None
|
if success:
|
||||||
self.connected = False
|
logger.info("Data service messaging initialized")
|
||||||
|
else:
|
||||||
|
logger.warning("Data service messaging failed to initialize")
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
"""Connect to RabbitMQ"""
|
|
||||||
if not AIO_PIKA_AVAILABLE:
|
|
||||||
logger.warning("aio-pika not available, messaging disabled")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.connection = await connect_robust(settings.RABBITMQ_URL)
|
|
||||||
self.channel = await self.connection.channel()
|
|
||||||
|
|
||||||
# Declare exchange for data events
|
|
||||||
self.exchange = await self.channel.declare_exchange(
|
|
||||||
"data.events",
|
|
||||||
ExchangeType.TOPIC,
|
|
||||||
durable=True
|
|
||||||
)
|
|
||||||
|
|
||||||
self.connected = True
|
|
||||||
logger.info("Connected to RabbitMQ for data events")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to connect to RabbitMQ", error=str(e))
|
|
||||||
self.connected = False
|
|
||||||
|
|
||||||
async def disconnect(self):
|
|
||||||
"""Disconnect from RabbitMQ"""
|
|
||||||
if self.connection and not self.connection.is_closed:
|
|
||||||
await self.connection.close()
|
|
||||||
self.connected = False
|
|
||||||
logger.info("Disconnected from RabbitMQ")
|
|
||||||
|
|
||||||
async def publish_data_imported(self, event_data: Dict[str, Any]):
|
|
||||||
"""Publish data imported event"""
|
|
||||||
await self._publish_event("data.imported", event_data)
|
|
||||||
|
|
||||||
async def publish_weather_updated(self, event_data: Dict[str, Any]):
|
|
||||||
"""Publish weather data updated event"""
|
|
||||||
await self._publish_event("weather.updated", event_data)
|
|
||||||
|
|
||||||
async def publish_traffic_updated(self, event_data: Dict[str, Any]):
|
|
||||||
"""Publish traffic data updated event"""
|
|
||||||
await self._publish_event("traffic.updated", event_data)
|
|
||||||
|
|
||||||
async def publish_sales_created(self, event_data: Dict[str, Any]):
|
|
||||||
"""Publish sales record created event"""
|
|
||||||
await self._publish_event("sales.created", event_data)
|
|
||||||
|
|
||||||
async def publish_import_completed(self, event_data: Dict[str, Any]):
|
|
||||||
"""Publish import process completed event"""
|
|
||||||
await self._publish_event("import.completed", event_data)
|
|
||||||
|
|
||||||
async def _publish_event(self, routing_key: str, data: Dict[str, Any]):
|
|
||||||
"""Publish event to exchange"""
|
|
||||||
try:
|
|
||||||
# If not connected, try to connect
|
|
||||||
if not self.connected:
|
|
||||||
await self.connect()
|
|
||||||
|
|
||||||
# If still not connected, log and return
|
|
||||||
if not self.connected:
|
|
||||||
logger.debug("Message not sent - RabbitMQ unavailable", routing_key=routing_key)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Prepare message
|
|
||||||
message_body = json.dumps(data, default=str)
|
|
||||||
message = Message(
|
|
||||||
message_body.encode(),
|
|
||||||
content_type="application/json",
|
|
||||||
delivery_mode=2 # Persistent
|
|
||||||
)
|
|
||||||
|
|
||||||
# Publish to exchange
|
|
||||||
await self.exchange.publish(
|
|
||||||
message,
|
|
||||||
routing_key=routing_key
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug("Event published", routing_key=routing_key, data_size=len(message_body))
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Failed to publish event",
|
|
||||||
routing_key=routing_key,
|
|
||||||
error=str(e))
|
|
||||||
# Reset connection on error
|
|
||||||
self.connected = False
|
|
||||||
|
|
||||||
class MockDataEventPublisher:
|
|
||||||
"""
|
|
||||||
Mock publisher for development/testing when RabbitMQ is not available
|
|
||||||
"""
|
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
logger.info("Mock publisher - connect called")
|
|
||||||
|
|
||||||
async def disconnect(self):
|
|
||||||
logger.info("Mock publisher - disconnect called")
|
|
||||||
|
|
||||||
async def publish_data_imported(self, event_data: Dict[str, Any]):
|
|
||||||
logger.debug("Mock publish - data imported", event_data=event_data)
|
|
||||||
|
|
||||||
async def publish_weather_updated(self, event_data: Dict[str, Any]):
|
|
||||||
logger.debug("Mock publish - weather updated", event_data=event_data)
|
|
||||||
|
|
||||||
async def publish_traffic_updated(self, event_data: Dict[str, Any]):
|
|
||||||
logger.debug("Mock publish - traffic updated", event_data=event_data)
|
|
||||||
|
|
||||||
async def publish_sales_created(self, event_data: Dict[str, Any]):
|
|
||||||
logger.debug("Mock publish - sales created", event_data=event_data)
|
|
||||||
|
|
||||||
async def publish_import_completed(self, event_data: Dict[str, Any]):
|
|
||||||
logger.debug("Mock publish - import completed", event_data=event_data)
|
|
||||||
|
|
||||||
# Global publisher instance
|
|
||||||
# Use mock if RabbitMQ is not available or in development mode
|
|
||||||
if AIO_PIKA_AVAILABLE and hasattr(settings, 'RABBITMQ_URL') and settings.RABBITMQ_URL:
|
|
||||||
data_publisher = DataEventPublisher()
|
|
||||||
else:
|
|
||||||
logger.info("Using mock data publisher")
|
|
||||||
data_publisher = MockDataEventPublisher()
|
|
||||||
|
|
||||||
# Ensure connection is established
|
|
||||||
async def init_messaging():
|
|
||||||
"""Initialize messaging connection"""
|
|
||||||
try:
|
|
||||||
await data_publisher.connect()
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to initialize messaging", error=str(e))
|
|
||||||
|
|
||||||
# Cleanup function
|
|
||||||
async def cleanup_messaging():
|
async def cleanup_messaging():
|
||||||
"""Cleanup messaging connection"""
|
"""Cleanup messaging for data service"""
|
||||||
try:
|
|
||||||
if hasattr(data_publisher, 'disconnect'):
|
|
||||||
await data_publisher.disconnect()
|
await data_publisher.disconnect()
|
||||||
except Exception as e:
|
logger.info("Data service messaging cleaned up")
|
||||||
logger.warning("Failed to cleanup messaging", error=str(e))
|
|
||||||
|
# Convenience functions for data-specific events
|
||||||
|
async def publish_data_imported(data: dict) -> bool:
|
||||||
|
"""Publish data imported event"""
|
||||||
|
return await data_publisher.publish_data_event("imported", data)
|
||||||
|
|
||||||
|
async def publish_weather_updated(data: dict) -> bool:
|
||||||
|
"""Publish weather updated event"""
|
||||||
|
return await data_publisher.publish_data_event("weather.updated", data)
|
||||||
|
|
||||||
|
async def publish_traffic_updated(data: dict) -> bool:
|
||||||
|
"""Publish traffic updated event"""
|
||||||
|
return await data_publisher.publish_data_event("traffic.updated", data)
|
||||||
|
|
||||||
|
async def publish_sales_created(data: dict) -> bool:
|
||||||
|
"""Publish sales created event"""
|
||||||
|
return await data_publisher.publish_data_event("sales.created", data)
|
||||||
|
|||||||
@@ -1,9 +1,38 @@
|
|||||||
"""
|
"""
|
||||||
Messaging service for training service
|
Training service messaging - Uses shared RabbitMQ client only
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from shared.messaging.rabbitmq import RabbitMQClient
|
from shared.messaging.rabbitmq import RabbitMQClient
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
import logging
|
||||||
|
|
||||||
# Global message publisher
|
logger = logging.getLogger(__name__)
|
||||||
message_publisher = RabbitMQClient(settings.RABBITMQ_URL)
|
|
||||||
|
# Single global instance
|
||||||
|
training_publisher = RabbitMQClient(settings.RABBITMQ_URL, "training-service")
|
||||||
|
|
||||||
|
async def setup_messaging():
|
||||||
|
"""Initialize messaging for training service"""
|
||||||
|
success = await training_publisher.connect()
|
||||||
|
if success:
|
||||||
|
logger.info("Training service messaging initialized")
|
||||||
|
else:
|
||||||
|
logger.warning("Training service messaging failed to initialize")
|
||||||
|
|
||||||
|
async def cleanup_messaging():
|
||||||
|
"""Cleanup messaging for training service"""
|
||||||
|
await training_publisher.disconnect()
|
||||||
|
logger.info("Training service messaging cleaned up")
|
||||||
|
|
||||||
|
# Convenience functions for training-specific events
|
||||||
|
async def publish_training_started(data: dict) -> bool:
|
||||||
|
"""Publish training started event"""
|
||||||
|
return await training_publisher.publish_training_event("started", data)
|
||||||
|
|
||||||
|
async def publish_training_completed(data: dict) -> bool:
|
||||||
|
"""Publish training completed event"""
|
||||||
|
return await training_publisher.publish_training_event("completed", data)
|
||||||
|
|
||||||
|
async def publish_training_failed(data: dict) -> bool:
|
||||||
|
"""Publish training failed event"""
|
||||||
|
return await training_publisher.publish_training_event("failed", data)
|
||||||
|
|||||||
@@ -1,14 +1,13 @@
|
|||||||
"""
|
"""
|
||||||
|
shared/messaging/events.py
|
||||||
Event definitions for microservices communication
|
Event definitions for microservices communication
|
||||||
- Simple class-based approach to avoid dataclass issues
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
class BaseEvent:
|
class BaseEvent:
|
||||||
"""Base event class"""
|
"""Base event class - FIXED"""
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], event_type: str = "", correlation_id: Optional[str] = None):
|
def __init__(self, service_name: str, data: Dict[str, Any], event_type: str = "", correlation_id: Optional[str] = None):
|
||||||
self.service_name = service_name
|
self.service_name = service_name
|
||||||
self.data = data
|
self.data = data
|
||||||
@@ -17,17 +16,45 @@ class BaseEvent:
|
|||||||
self.timestamp = datetime.now(timezone.utc)
|
self.timestamp = datetime.now(timezone.utc)
|
||||||
self.correlation_id = correlation_id
|
self.correlation_id = correlation_id
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]: # Add this method
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
"""Converts the event object to a dictionary for JSON serialization."""
|
"""Converts the event object to a dictionary for JSON serialization - FIXED"""
|
||||||
return {
|
return {
|
||||||
"service_name": self.service_name,
|
"service_name": self.service_name,
|
||||||
"data": self.data,
|
"data": self.data,
|
||||||
"event_type": self.event_type,
|
"event_type": self.event_type,
|
||||||
"event_id": self.event_id,
|
"event_id": self.event_id,
|
||||||
"timestamp": self.timestamp.isoformat(), # Convert datetime to ISO 8601 string
|
"timestamp": self.timestamp.isoformat(), # Convert datetime to ISO string
|
||||||
"correlation_id": self.correlation_id
|
"correlation_id": self.correlation_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Auth Events - FIXED
|
||||||
|
class UserRegisteredEvent(BaseEvent):
|
||||||
|
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
||||||
|
super().__init__(
|
||||||
|
service_name=service_name,
|
||||||
|
data=data,
|
||||||
|
event_type="user.registered",
|
||||||
|
correlation_id=correlation_id
|
||||||
|
)
|
||||||
|
|
||||||
|
class UserLoginEvent(BaseEvent):
|
||||||
|
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
||||||
|
super().__init__(
|
||||||
|
service_name=service_name,
|
||||||
|
data=data,
|
||||||
|
event_type="user.login",
|
||||||
|
correlation_id=correlation_id
|
||||||
|
)
|
||||||
|
|
||||||
|
class UserLogoutEvent(BaseEvent):
|
||||||
|
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
||||||
|
super().__init__(
|
||||||
|
service_name=service_name,
|
||||||
|
data=data,
|
||||||
|
event_type="user.logout",
|
||||||
|
correlation_id=correlation_id
|
||||||
|
)
|
||||||
|
|
||||||
# Training Events
|
# Training Events
|
||||||
class TrainingStartedEvent(BaseEvent):
|
class TrainingStartedEvent(BaseEvent):
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
||||||
@@ -66,68 +93,12 @@ class ForecastGeneratedEvent(BaseEvent):
|
|||||||
correlation_id=correlation_id
|
correlation_id=correlation_id
|
||||||
)
|
)
|
||||||
|
|
||||||
class ForecastRequestedEvent(BaseEvent):
|
# Data Events
|
||||||
|
class DataImportedEvent(BaseEvent):
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
service_name=service_name,
|
service_name=service_name,
|
||||||
data=data,
|
data=data,
|
||||||
event_type="forecast.requested",
|
event_type="data.imported",
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# User Events
|
|
||||||
class UserRegisteredEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="user.registered",
|
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
class UserLoginEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="user.login",
|
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# Tenant Events
|
|
||||||
class TenantCreatedEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="tenant.created",
|
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
class TenantUpdatedEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="tenant.updated",
|
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# Notification Events
|
|
||||||
class NotificationSentEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="notification.sent",
|
|
||||||
correlation_id=correlation_id
|
|
||||||
)
|
|
||||||
|
|
||||||
class NotificationFailedEvent(BaseEvent):
|
|
||||||
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
|
|
||||||
super().__init__(
|
|
||||||
service_name=service_name,
|
|
||||||
data=data,
|
|
||||||
event_type="notification.failed",
|
|
||||||
correlation_id=correlation_id
|
correlation_id=correlation_id
|
||||||
)
|
)
|
||||||
@@ -1,79 +1,157 @@
|
|||||||
"""
|
"""
|
||||||
RabbitMQ messaging client for microservices
|
RabbitMQ messaging client for microservices - FIXED VERSION
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
from typing import Dict, Any, Callable, Optional
|
||||||
from typing import Dict, Any, Callable
|
from datetime import datetime, date
|
||||||
import aio_pika
|
import uuid
|
||||||
from aio_pika import connect_robust, Message, DeliveryMode
|
import logstash
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
try:
|
||||||
|
import aio_pika
|
||||||
|
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType
|
||||||
|
AIO_PIKA_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
AIO_PIKA_AVAILABLE = False
|
||||||
|
|
||||||
|
logger = logstash.get_logger()
|
||||||
|
|
||||||
|
def json_serializer(obj):
|
||||||
|
"""JSON serializer for objects not serializable by default json code"""
|
||||||
|
if isinstance(obj, (datetime, date)):
|
||||||
|
return obj.isoformat()
|
||||||
|
elif isinstance(obj, uuid.UUID):
|
||||||
|
return str(obj)
|
||||||
|
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
|
||||||
|
|
||||||
class RabbitMQClient:
|
class RabbitMQClient:
|
||||||
"""RabbitMQ client for microservices communication"""
|
"""
|
||||||
|
Universal RabbitMQ client for all microservices
|
||||||
|
Handles all messaging patterns with proper fallbacks
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, connection_url: str):
|
def __init__(self, connection_url: str, service_name: str = "unknown"):
|
||||||
self.connection_url = connection_url
|
self.connection_url = connection_url
|
||||||
|
self.service_name = service_name
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.channel = None
|
self.channel = None
|
||||||
|
self.connected = False
|
||||||
|
self._reconnect_attempts = 0
|
||||||
|
self._max_reconnect_attempts = 5
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
"""Connect to RabbitMQ"""
|
"""Connect to RabbitMQ with retry logic"""
|
||||||
|
if not AIO_PIKA_AVAILABLE:
|
||||||
|
logger.warning("aio-pika not available, messaging disabled", service=self.service_name)
|
||||||
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.connection = await connect_robust(self.connection_url)
|
self.connection = await connect_robust(
|
||||||
|
self.connection_url,
|
||||||
|
heartbeat=30,
|
||||||
|
connection_attempts=3
|
||||||
|
)
|
||||||
self.channel = await self.connection.channel()
|
self.channel = await self.connection.channel()
|
||||||
logger.info("Connected to RabbitMQ")
|
await self.channel.set_qos(prefetch_count=100) # Performance optimization
|
||||||
|
|
||||||
|
self.connected = True
|
||||||
|
self._reconnect_attempts = 0
|
||||||
|
logger.info("Connected to RabbitMQ", service=self.service_name)
|
||||||
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to connect to RabbitMQ: {e}")
|
self.connected = False
|
||||||
raise
|
self._reconnect_attempts += 1
|
||||||
|
logger.warning(
|
||||||
|
"Failed to connect to RabbitMQ",
|
||||||
|
service=self.service_name,
|
||||||
|
error=str(e),
|
||||||
|
attempt=self._reconnect_attempts
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
async def disconnect(self):
|
async def disconnect(self):
|
||||||
"""Disconnect from RabbitMQ"""
|
"""Disconnect from RabbitMQ"""
|
||||||
if self.connection:
|
if self.connection and not self.connection.is_closed:
|
||||||
await self.connection.close()
|
await self.connection.close()
|
||||||
logger.info("Disconnected from RabbitMQ")
|
self.connected = False
|
||||||
|
logger.info("Disconnected from RabbitMQ", service=self.service_name)
|
||||||
|
|
||||||
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any]):
|
async def ensure_connected(self) -> bool:
|
||||||
"""Publish event to RabbitMQ"""
|
"""Ensure connection is active, reconnect if needed"""
|
||||||
|
if self.connected and self.connection and not self.connection.is_closed:
|
||||||
|
return True
|
||||||
|
|
||||||
|
if self._reconnect_attempts >= self._max_reconnect_attempts:
|
||||||
|
logger.error("Max reconnection attempts reached", service=self.service_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return await self.connect()
|
||||||
|
|
||||||
|
async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any],
|
||||||
|
persistent: bool = True) -> bool:
|
||||||
|
"""
|
||||||
|
Universal event publisher with automatic fallback
|
||||||
|
Returns True if published successfully, False otherwise
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
if not self.channel:
|
# Ensure we're connected
|
||||||
await self.connect()
|
if not await self.ensure_connected():
|
||||||
|
logger.debug("Event not published - RabbitMQ unavailable",
|
||||||
|
service=self.service_name, routing_key=routing_key)
|
||||||
|
return False
|
||||||
|
|
||||||
# Declare exchange
|
# Declare exchange
|
||||||
exchange = await self.channel.declare_exchange(
|
exchange = await self.channel.declare_exchange(
|
||||||
exchange_name,
|
exchange_name,
|
||||||
aio_pika.ExchangeType.TOPIC,
|
ExchangeType.TOPIC,
|
||||||
durable=True
|
durable=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create message
|
# Prepare message with proper JSON serialization
|
||||||
|
message_body = json.dumps(event_data, default=json_serializer)
|
||||||
message = Message(
|
message = Message(
|
||||||
json.dumps(event_data).encode(),
|
message_body.encode(),
|
||||||
delivery_mode=DeliveryMode.PERSISTENT,
|
delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
|
||||||
content_type="application/json"
|
content_type="application/json",
|
||||||
|
timestamp=datetime.now(),
|
||||||
|
headers={
|
||||||
|
"source_service": self.service_name,
|
||||||
|
"event_id": event_data.get("event_id", str(uuid.uuid4()))
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Publish message
|
# Publish message
|
||||||
await exchange.publish(message, routing_key=routing_key)
|
await exchange.publish(message, routing_key=routing_key)
|
||||||
|
|
||||||
logger.info(f"Published event to {exchange_name} with routing key {routing_key}")
|
logger.debug("Event published successfully",
|
||||||
|
service=self.service_name,
|
||||||
|
exchange=exchange_name,
|
||||||
|
routing_key=routing_key,
|
||||||
|
size=len(message_body))
|
||||||
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to publish event: {e}")
|
logger.error("Failed to publish event",
|
||||||
raise
|
service=self.service_name,
|
||||||
|
exchange=exchange_name,
|
||||||
|
routing_key=routing_key,
|
||||||
|
error=str(e))
|
||||||
|
self.connected = False # Force reconnection on next attempt
|
||||||
|
return False
|
||||||
|
|
||||||
async def consume_events(self, exchange_name: str, queue_name: str, routing_key: str, callback: Callable):
|
async def consume_events(self, exchange_name: str, queue_name: str,
|
||||||
"""Consume events from RabbitMQ"""
|
routing_key: str, callback: Callable) -> bool:
|
||||||
|
"""Universal event consumer"""
|
||||||
try:
|
try:
|
||||||
if not self.channel:
|
if not await self.ensure_connected():
|
||||||
await self.connect()
|
return False
|
||||||
|
|
||||||
# Declare exchange
|
# Declare exchange
|
||||||
exchange = await self.channel.declare_exchange(
|
exchange = await self.channel.declare_exchange(
|
||||||
exchange_name,
|
exchange_name,
|
||||||
aio_pika.ExchangeType.TOPIC,
|
ExchangeType.TOPIC,
|
||||||
durable=True
|
durable=True
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -89,8 +167,31 @@ class RabbitMQClient:
|
|||||||
# Set up consumer
|
# Set up consumer
|
||||||
await queue.consume(callback)
|
await queue.consume(callback)
|
||||||
|
|
||||||
logger.info(f"Started consuming events from {queue_name}")
|
logger.info("Started consuming events",
|
||||||
|
service=self.service_name,
|
||||||
|
queue=queue_name,
|
||||||
|
routing_key=routing_key)
|
||||||
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to consume events: {e}")
|
logger.error("Failed to start consuming events",
|
||||||
raise
|
service=self.service_name,
|
||||||
|
error=str(e))
|
||||||
|
return False
|
||||||
|
|
||||||
|
# High-level convenience methods for common patterns
|
||||||
|
async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool:
|
||||||
|
"""Publish user-related events"""
|
||||||
|
return await self.publish_event("user.events", f"user.{event_type}", user_data)
|
||||||
|
|
||||||
|
async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool:
|
||||||
|
"""Publish training-related events"""
|
||||||
|
return await self.publish_event("training.events", f"training.{event_type}", training_data)
|
||||||
|
|
||||||
|
async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool:
|
||||||
|
"""Publish data-related events"""
|
||||||
|
return await self.publish_event("data.events", f"data.{event_type}", data)
|
||||||
|
|
||||||
|
async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool:
|
||||||
|
"""Publish forecast-related events"""
|
||||||
|
return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)
|
||||||
|
|||||||
Reference in New Issue
Block a user