From a469f0c01db7e22378d3f501cfa36616078d3cdc Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 18 Jul 2025 14:18:52 +0200 Subject: [PATCH] Fix bugs issues --- services/auth/app/services/auth_service.py | 57 +++--- services/auth/app/services/messaging.py | 35 ++-- services/data/app/services/messaging.py | 190 ++++---------------- services/training/app/services/messaging.py | 35 +++- shared/messaging/events.py | 101 ++++------- shared/messaging/rabbitmq.py | 175 ++++++++++++++---- 6 files changed, 288 insertions(+), 305 deletions(-) diff --git a/services/auth/app/services/auth_service.py b/services/auth/app/services/auth_service.py index ce77b937..13362b18 100644 --- a/services/auth/app/services/auth_service.py +++ b/services/auth/app/services/auth_service.py @@ -16,8 +16,7 @@ from app.core.config import settings from app.models.users import User, UserSession from app.schemas.auth import UserRegistration, UserLogin, TokenResponse, UserResponse from app.core.security import security_manager -from app.services.messaging import message_publisher -from shared.messaging.events import UserRegisteredEvent, UserLoginEvent +from app.services.messaging import publish_user_registered, publish_user_login logger = logging.getLogger(__name__) @@ -58,20 +57,18 @@ class AuthService: await db.commit() await db.refresh(user) - # Publish user registered event - await message_publisher.publish_event( - "user_events", - "user.registered", - UserRegisteredEvent( - service_name="auth-service", - data={ - "user_id": str(user.id), - "email": user.email, - "full_name": user.full_name, - "language": user.language - } - ).__dict__ - ) + # Publish user registered event - SIMPLIFIED + event_data = { + "user_id": str(user.id), + "email": user.email, + "full_name": user.full_name, + "language": user.language, + "timestamp": datetime.now(timezone.utc).isoformat() + } + + success = await publish_user_registered(event_data) + if not success: + logger.warning("Failed to publish user registered event", user_id=str(user.id)) logger.info(f"User registered: {user.email}") return UserResponse(**user.to_dict()) @@ -144,22 +141,20 @@ class AuthService: db.add(session) await db.commit() - # Publish login event - await message_publisher.publish_event( - "user_events", - "user.login", - UserLoginEvent( - service_name="auth-service", - data={ - "user_id": str(user.id), - "email": user.email, - "ip_address": ip_address, - "user_agent": user_agent - } - ).__dict__ - ) + # Publish login event - SIMPLIFIED + event_data = { + "user_id": str(user.id), + "email": user.email, + "ip_address": ip_address, + "user_agent": user_agent, + "timestamp": datetime.now(timezone.utc).isoformat() + } - logger.info(f"User login successful: {user.email}") + success = await publish_user_login(event_data) + if not success: + logger.warning("Failed to publish login event", user_id=str(user.id)) + + logger.info("User login successful", user_id=str(user.id), email=user.email) return TokenResponse( access_token=access_token, diff --git a/services/auth/app/services/messaging.py b/services/auth/app/services/messaging.py index a1db6c46..eb8d53b1 100644 --- a/services/auth/app/services/messaging.py +++ b/services/auth/app/services/messaging.py @@ -2,24 +2,37 @@ """ Messaging service for auth service """ - from shared.messaging.rabbitmq import RabbitMQClient from app.core.config import settings import logging logger = logging.getLogger(__name__) -# Global message publisher -message_publisher = RabbitMQClient(settings.RABBITMQ_URL) +# Single global instance +auth_publisher = RabbitMQClient(settings.RABBITMQ_URL, "auth-service") async def setup_messaging(): - """Establishes connection to RabbitMQ for the message publisher.""" - logger.info("Attempting to connect to RabbitMQ...") - await message_publisher.connect() - logger.info("RabbitMQ connection established.") + """Initialize messaging for auth service""" + success = await auth_publisher.connect() + if success: + logger.info("Auth service messaging initialized") + else: + logger.warning("Auth service messaging failed to initialize") async def cleanup_messaging(): - """Closes the connection to RabbitMQ for the message publisher.""" - logger.info("Attempting to disconnect from RabbitMQ...") - await message_publisher.disconnect() - logger.info("RabbitMQ connection closed.") \ No newline at end of file + """Cleanup messaging for auth service""" + await auth_publisher.disconnect() + logger.info("Auth service messaging cleaned up") + +# Convenience functions for auth-specific events +async def publish_user_registered(user_data: dict) -> bool: + """Publish user registered event""" + return await auth_publisher.publish_user_event("registered", user_data) + +async def publish_user_login(user_data: dict) -> bool: + """Publish user login event""" + return await auth_publisher.publish_user_event("login", user_data) + +async def publish_user_logout(user_data: dict) -> bool: + """Publish user logout event""" + return await auth_publisher.publish_user_event("logout", user_data) diff --git a/services/data/app/services/messaging.py b/services/data/app/services/messaging.py index f9e30896..002d1600 100644 --- a/services/data/app/services/messaging.py +++ b/services/data/app/services/messaging.py @@ -1,168 +1,42 @@ # ================================================================ # services/data/app/services/messaging.py # ================================================================ -"""Message queue service for data events""" -import json -import asyncio -from typing import Dict, Any, Optional -import structlog - -try: - from aio_pika import connect_robust, Message, ExchangeType - AIO_PIKA_AVAILABLE = True -except ImportError: - AIO_PIKA_AVAILABLE = False - +from shared.messaging.rabbitmq import RabbitMQClient from app.core.config import settings +import logging -logger = structlog.get_logger() +logger = logging.getLogger(__name__) -class DataEventPublisher: - """ - Event publisher for data service events. - Falls back gracefully if RabbitMQ is not available. - """ - - def __init__(self): - self.connection = None - self.channel = None - self.exchange = None - self.connected = False - - async def connect(self): - """Connect to RabbitMQ""" - if not AIO_PIKA_AVAILABLE: - logger.warning("aio-pika not available, messaging disabled") - return - - try: - self.connection = await connect_robust(settings.RABBITMQ_URL) - self.channel = await self.connection.channel() - - # Declare exchange for data events - self.exchange = await self.channel.declare_exchange( - "data.events", - ExchangeType.TOPIC, - durable=True - ) - - self.connected = True - logger.info("Connected to RabbitMQ for data events") - - except Exception as e: - logger.warning("Failed to connect to RabbitMQ", error=str(e)) - self.connected = False - - async def disconnect(self): - """Disconnect from RabbitMQ""" - if self.connection and not self.connection.is_closed: - await self.connection.close() - self.connected = False - logger.info("Disconnected from RabbitMQ") - - async def publish_data_imported(self, event_data: Dict[str, Any]): - """Publish data imported event""" - await self._publish_event("data.imported", event_data) - - async def publish_weather_updated(self, event_data: Dict[str, Any]): - """Publish weather data updated event""" - await self._publish_event("weather.updated", event_data) - - async def publish_traffic_updated(self, event_data: Dict[str, Any]): - """Publish traffic data updated event""" - await self._publish_event("traffic.updated", event_data) - - async def publish_sales_created(self, event_data: Dict[str, Any]): - """Publish sales record created event""" - await self._publish_event("sales.created", event_data) - - async def publish_import_completed(self, event_data: Dict[str, Any]): - """Publish import process completed event""" - await self._publish_event("import.completed", event_data) - - async def _publish_event(self, routing_key: str, data: Dict[str, Any]): - """Publish event to exchange""" - try: - # If not connected, try to connect - if not self.connected: - await self.connect() - - # If still not connected, log and return - if not self.connected: - logger.debug("Message not sent - RabbitMQ unavailable", routing_key=routing_key) - return - - # Prepare message - message_body = json.dumps(data, default=str) - message = Message( - message_body.encode(), - content_type="application/json", - delivery_mode=2 # Persistent - ) - - # Publish to exchange - await self.exchange.publish( - message, - routing_key=routing_key - ) - - logger.debug("Event published", routing_key=routing_key, data_size=len(message_body)) - - except Exception as e: - logger.error("Failed to publish event", - routing_key=routing_key, - error=str(e)) - # Reset connection on error - self.connected = False +# Single global instance +data_publisher = RabbitMQClient(settings.RABBITMQ_URL, "data-service") -class MockDataEventPublisher: - """ - Mock publisher for development/testing when RabbitMQ is not available - """ - - async def connect(self): - logger.info("Mock publisher - connect called") - - async def disconnect(self): - logger.info("Mock publisher - disconnect called") - - async def publish_data_imported(self, event_data: Dict[str, Any]): - logger.debug("Mock publish - data imported", event_data=event_data) - - async def publish_weather_updated(self, event_data: Dict[str, Any]): - logger.debug("Mock publish - weather updated", event_data=event_data) - - async def publish_traffic_updated(self, event_data: Dict[str, Any]): - logger.debug("Mock publish - traffic updated", event_data=event_data) - - async def publish_sales_created(self, event_data: Dict[str, Any]): - logger.debug("Mock publish - sales created", event_data=event_data) - - async def publish_import_completed(self, event_data: Dict[str, Any]): - logger.debug("Mock publish - import completed", event_data=event_data) +async def setup_messaging(): + """Initialize messaging for data service""" + success = await data_publisher.connect() + if success: + logger.info("Data service messaging initialized") + else: + logger.warning("Data service messaging failed to initialize") -# Global publisher instance -# Use mock if RabbitMQ is not available or in development mode -if AIO_PIKA_AVAILABLE and hasattr(settings, 'RABBITMQ_URL') and settings.RABBITMQ_URL: - data_publisher = DataEventPublisher() -else: - logger.info("Using mock data publisher") - data_publisher = MockDataEventPublisher() - -# Ensure connection is established -async def init_messaging(): - """Initialize messaging connection""" - try: - await data_publisher.connect() - except Exception as e: - logger.warning("Failed to initialize messaging", error=str(e)) - -# Cleanup function async def cleanup_messaging(): - """Cleanup messaging connection""" - try: - if hasattr(data_publisher, 'disconnect'): - await data_publisher.disconnect() - except Exception as e: - logger.warning("Failed to cleanup messaging", error=str(e)) \ No newline at end of file + """Cleanup messaging for data service""" + await data_publisher.disconnect() + logger.info("Data service messaging cleaned up") + +# Convenience functions for data-specific events +async def publish_data_imported(data: dict) -> bool: + """Publish data imported event""" + return await data_publisher.publish_data_event("imported", data) + +async def publish_weather_updated(data: dict) -> bool: + """Publish weather updated event""" + return await data_publisher.publish_data_event("weather.updated", data) + +async def publish_traffic_updated(data: dict) -> bool: + """Publish traffic updated event""" + return await data_publisher.publish_data_event("traffic.updated", data) + +async def publish_sales_created(data: dict) -> bool: + """Publish sales created event""" + return await data_publisher.publish_data_event("sales.created", data) diff --git a/services/training/app/services/messaging.py b/services/training/app/services/messaging.py index e03d2a5b..e5c90d1c 100644 --- a/services/training/app/services/messaging.py +++ b/services/training/app/services/messaging.py @@ -1,9 +1,38 @@ """ -Messaging service for training service +Training service messaging - Uses shared RabbitMQ client only """ from shared.messaging.rabbitmq import RabbitMQClient from app.core.config import settings +import logging -# Global message publisher -message_publisher = RabbitMQClient(settings.RABBITMQ_URL) \ No newline at end of file +logger = logging.getLogger(__name__) + +# Single global instance +training_publisher = RabbitMQClient(settings.RABBITMQ_URL, "training-service") + +async def setup_messaging(): + """Initialize messaging for training service""" + success = await training_publisher.connect() + if success: + logger.info("Training service messaging initialized") + else: + logger.warning("Training service messaging failed to initialize") + +async def cleanup_messaging(): + """Cleanup messaging for training service""" + await training_publisher.disconnect() + logger.info("Training service messaging cleaned up") + +# Convenience functions for training-specific events +async def publish_training_started(data: dict) -> bool: + """Publish training started event""" + return await training_publisher.publish_training_event("started", data) + +async def publish_training_completed(data: dict) -> bool: + """Publish training completed event""" + return await training_publisher.publish_training_event("completed", data) + +async def publish_training_failed(data: dict) -> bool: + """Publish training failed event""" + return await training_publisher.publish_training_event("failed", data) diff --git a/shared/messaging/events.py b/shared/messaging/events.py index 7149bcec..b085bfd1 100644 --- a/shared/messaging/events.py +++ b/shared/messaging/events.py @@ -1,14 +1,13 @@ """ +shared/messaging/events.py Event definitions for microservices communication -- Simple class-based approach to avoid dataclass issues """ - from datetime import datetime, timezone from typing import Dict, Any, Optional import uuid class BaseEvent: - """Base event class""" + """Base event class - FIXED""" def __init__(self, service_name: str, data: Dict[str, Any], event_type: str = "", correlation_id: Optional[str] = None): self.service_name = service_name self.data = data @@ -17,17 +16,45 @@ class BaseEvent: self.timestamp = datetime.now(timezone.utc) self.correlation_id = correlation_id - def to_dict(self) -> Dict[str, Any]: # Add this method - """Converts the event object to a dictionary for JSON serialization.""" + def to_dict(self) -> Dict[str, Any]: + """Converts the event object to a dictionary for JSON serialization - FIXED""" return { "service_name": self.service_name, "data": self.data, "event_type": self.event_type, "event_id": self.event_id, - "timestamp": self.timestamp.isoformat(), # Convert datetime to ISO 8601 string + "timestamp": self.timestamp.isoformat(), # Convert datetime to ISO string "correlation_id": self.correlation_id } +# Auth Events - FIXED +class UserRegisteredEvent(BaseEvent): + def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): + super().__init__( + service_name=service_name, + data=data, + event_type="user.registered", + correlation_id=correlation_id + ) + +class UserLoginEvent(BaseEvent): + def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): + super().__init__( + service_name=service_name, + data=data, + event_type="user.login", + correlation_id=correlation_id + ) + +class UserLogoutEvent(BaseEvent): + def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): + super().__init__( + service_name=service_name, + data=data, + event_type="user.logout", + correlation_id=correlation_id + ) + # Training Events class TrainingStartedEvent(BaseEvent): def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): @@ -66,68 +93,12 @@ class ForecastGeneratedEvent(BaseEvent): correlation_id=correlation_id ) -class ForecastRequestedEvent(BaseEvent): +# Data Events +class DataImportedEvent(BaseEvent): def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): super().__init__( service_name=service_name, data=data, - event_type="forecast.requested", + event_type="data.imported", correlation_id=correlation_id ) - -# User Events -class UserRegisteredEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="user.registered", - correlation_id=correlation_id - ) - -class UserLoginEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="user.login", - correlation_id=correlation_id - ) - -# Tenant Events -class TenantCreatedEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="tenant.created", - correlation_id=correlation_id - ) - -class TenantUpdatedEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="tenant.updated", - correlation_id=correlation_id - ) - -# Notification Events -class NotificationSentEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="notification.sent", - correlation_id=correlation_id - ) - -class NotificationFailedEvent(BaseEvent): - def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None): - super().__init__( - service_name=service_name, - data=data, - event_type="notification.failed", - correlation_id=correlation_id - ) \ No newline at end of file diff --git a/shared/messaging/rabbitmq.py b/shared/messaging/rabbitmq.py index 62d95cfb..2d05f1e6 100644 --- a/shared/messaging/rabbitmq.py +++ b/shared/messaging/rabbitmq.py @@ -1,79 +1,157 @@ """ -RabbitMQ messaging client for microservices +RabbitMQ messaging client for microservices - FIXED VERSION """ - import asyncio import json -import logging -from typing import Dict, Any, Callable -import aio_pika -from aio_pika import connect_robust, Message, DeliveryMode +from typing import Dict, Any, Callable, Optional +from datetime import datetime, date +import uuid +import logstash -logger = logging.getLogger(__name__) +try: + import aio_pika + from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType + AIO_PIKA_AVAILABLE = True +except ImportError: + AIO_PIKA_AVAILABLE = False + +logger = logstash.get_logger() + +def json_serializer(obj): + """JSON serializer for objects not serializable by default json code""" + if isinstance(obj, (datetime, date)): + return obj.isoformat() + elif isinstance(obj, uuid.UUID): + return str(obj) + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") class RabbitMQClient: - """RabbitMQ client for microservices communication""" + """ + Universal RabbitMQ client for all microservices + Handles all messaging patterns with proper fallbacks + """ - def __init__(self, connection_url: str): + def __init__(self, connection_url: str, service_name: str = "unknown"): self.connection_url = connection_url + self.service_name = service_name self.connection = None self.channel = None - + self.connected = False + self._reconnect_attempts = 0 + self._max_reconnect_attempts = 5 + async def connect(self): - """Connect to RabbitMQ""" + """Connect to RabbitMQ with retry logic""" + if not AIO_PIKA_AVAILABLE: + logger.warning("aio-pika not available, messaging disabled", service=self.service_name) + return False + try: - self.connection = await connect_robust(self.connection_url) + self.connection = await connect_robust( + self.connection_url, + heartbeat=30, + connection_attempts=3 + ) self.channel = await self.connection.channel() - logger.info("Connected to RabbitMQ") + await self.channel.set_qos(prefetch_count=100) # Performance optimization + + self.connected = True + self._reconnect_attempts = 0 + logger.info("Connected to RabbitMQ", service=self.service_name) + return True + except Exception as e: - logger.error(f"Failed to connect to RabbitMQ: {e}") - raise + self.connected = False + self._reconnect_attempts += 1 + logger.warning( + "Failed to connect to RabbitMQ", + service=self.service_name, + error=str(e), + attempt=self._reconnect_attempts + ) + return False async def disconnect(self): """Disconnect from RabbitMQ""" - if self.connection: + if self.connection and not self.connection.is_closed: await self.connection.close() - logger.info("Disconnected from RabbitMQ") + self.connected = False + logger.info("Disconnected from RabbitMQ", service=self.service_name) - async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any]): - """Publish event to RabbitMQ""" + async def ensure_connected(self) -> bool: + """Ensure connection is active, reconnect if needed""" + if self.connected and self.connection and not self.connection.is_closed: + return True + + if self._reconnect_attempts >= self._max_reconnect_attempts: + logger.error("Max reconnection attempts reached", service=self.service_name) + return False + + return await self.connect() + + async def publish_event(self, exchange_name: str, routing_key: str, event_data: Dict[str, Any], + persistent: bool = True) -> bool: + """ + Universal event publisher with automatic fallback + Returns True if published successfully, False otherwise + """ try: - if not self.channel: - await self.connect() + # Ensure we're connected + if not await self.ensure_connected(): + logger.debug("Event not published - RabbitMQ unavailable", + service=self.service_name, routing_key=routing_key) + return False # Declare exchange exchange = await self.channel.declare_exchange( exchange_name, - aio_pika.ExchangeType.TOPIC, + ExchangeType.TOPIC, durable=True ) - # Create message + # Prepare message with proper JSON serialization + message_body = json.dumps(event_data, default=json_serializer) message = Message( - json.dumps(event_data).encode(), - delivery_mode=DeliveryMode.PERSISTENT, - content_type="application/json" + message_body.encode(), + delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT, + content_type="application/json", + timestamp=datetime.now(), + headers={ + "source_service": self.service_name, + "event_id": event_data.get("event_id", str(uuid.uuid4())) + } ) # Publish message await exchange.publish(message, routing_key=routing_key) - logger.info(f"Published event to {exchange_name} with routing key {routing_key}") + logger.debug("Event published successfully", + service=self.service_name, + exchange=exchange_name, + routing_key=routing_key, + size=len(message_body)) + return True except Exception as e: - logger.error(f"Failed to publish event: {e}") - raise + logger.error("Failed to publish event", + service=self.service_name, + exchange=exchange_name, + routing_key=routing_key, + error=str(e)) + self.connected = False # Force reconnection on next attempt + return False - async def consume_events(self, exchange_name: str, queue_name: str, routing_key: str, callback: Callable): - """Consume events from RabbitMQ""" + async def consume_events(self, exchange_name: str, queue_name: str, + routing_key: str, callback: Callable) -> bool: + """Universal event consumer""" try: - if not self.channel: - await self.connect() + if not await self.ensure_connected(): + return False # Declare exchange exchange = await self.channel.declare_exchange( exchange_name, - aio_pika.ExchangeType.TOPIC, + ExchangeType.TOPIC, durable=True ) @@ -89,8 +167,31 @@ class RabbitMQClient: # Set up consumer await queue.consume(callback) - logger.info(f"Started consuming events from {queue_name}") + logger.info("Started consuming events", + service=self.service_name, + queue=queue_name, + routing_key=routing_key) + return True except Exception as e: - logger.error(f"Failed to consume events: {e}") - raise \ No newline at end of file + logger.error("Failed to start consuming events", + service=self.service_name, + error=str(e)) + return False + + # High-level convenience methods for common patterns + async def publish_user_event(self, event_type: str, user_data: Dict[str, Any]) -> bool: + """Publish user-related events""" + return await self.publish_event("user.events", f"user.{event_type}", user_data) + + async def publish_training_event(self, event_type: str, training_data: Dict[str, Any]) -> bool: + """Publish training-related events""" + return await self.publish_event("training.events", f"training.{event_type}", training_data) + + async def publish_data_event(self, event_type: str, data: Dict[str, Any]) -> bool: + """Publish data-related events""" + return await self.publish_event("data.events", f"data.{event_type}", data) + + async def publish_forecast_event(self, event_type: str, forecast_data: Dict[str, Any]) -> bool: + """Publish forecast-related events""" + return await self.publish_event("forecast.events", f"forecast.{event_type}", forecast_data)