Files
bakery-ia/shared/messaging
2025-12-15 21:14:22 +01:00
..
2025-12-13 23:57:54 +01:00
2025-12-15 21:14:22 +01:00
2025-12-13 23:57:54 +01:00
2025-12-13 23:57:54 +01:00

Unified Messaging Architecture

This document describes the standardized messaging system used across all bakery-ia microservices.

Overview

The unified messaging architecture provides a consistent approach for:

  • Publishing business events (inventory changes, user actions, etc.)
  • Publishing user-facing alerts, notifications, and recommendations
  • Consuming events from other services
  • Maintaining service-to-service communication patterns

Core Components

1. UnifiedEventPublisher

The main publisher for all event types, located in shared/messaging/messaging_client.py:

from shared.messaging import UnifiedEventPublisher, EVENT_TYPES, RabbitMQClient

# Initialize
rabbitmq_client = RabbitMQClient(settings.RABBITMQ_URL, service_name="my-service")
await rabbitmq_client.connect()
event_publisher = UnifiedEventPublisher(rabbitmq_client, "my-service")

# Publish business events
await event_publisher.publish_business_event(
    event_type=EVENT_TYPES.INVENTORY.STOCK_ADDED,
    tenant_id=tenant_id,
    data={"ingredient_id": "123", "quantity": 100.0}
)

# Publish alerts (action required)
await event_publisher.publish_alert(
    event_type="procurement.po_approval_needed",
    tenant_id=tenant_id,
    severity="high",  # urgent, high, medium, low
    data={"po_id": "456", "supplier_name": "ABC Corp"}
)

# Publish notifications (informational)
await event_publisher.publish_notification(
    event_type="production.batch_completed",
    tenant_id=tenant_id,
    data={"batch_id": "789", "product_name": "Bread"}
)

# Publish recommendations (suggestions)
await event_publisher.publish_recommendation(
    event_type="forecasting.demand_surge_predicted",
    tenant_id=tenant_id,
    data={"product_name": "Croissants", "surge_percentage": 25.0}
)

2. Event Types Constants

Use predefined event types for consistency:

from shared.messaging import EVENT_TYPES

# Inventory events
EVENT_TYPES.INVENTORY.INGREDIENT_CREATED
EVENT_TYPES.INVENTORY.STOCK_ADDED
EVENT_TYPES.INVENTORY.LOW_STOCK_ALERT

# Production events
EVENT_TYPES.PRODUCTION.BATCH_CREATED
EVENT_TYPES.PRODUCTION.BATCH_COMPLETED

# Procurement events
EVENT_TYPES.PROCUREMENT.PO_APPROVED
EVENT_TYPES.PROCUREMENT.DELIVERY_SCHEDULED

3. Service Integration Pattern

In Service Main.py:

from shared.messaging import UnifiedEventPublisher, ServiceMessagingManager

class MyService(StandardFastAPIService):
    def __init__(self):
        self.messaging_manager = None
        self.event_publisher = None  # For alerts/notifications
        self.unified_publisher = None  # For business events

        super().__init__(
            service_name="my-service",
            # ... other params
            enable_messaging=True
        )

    async def _setup_messaging(self):
        try:
            self.messaging_manager = ServiceMessagingManager("my-service", settings.RABBITMQ_URL)
            success = await self.messaging_manager.setup()
            if success:
                self.event_publisher = self.messaging_manager.publisher
                self.unified_publisher = self.messaging_manager.publisher

                self.logger.info("Messaging setup completed")
            else:
                raise Exception("Failed to setup messaging")
        except Exception as e:
            self.logger.error("Messaging setup failed", error=str(e))
            raise

    async def on_startup(self, app: FastAPI):
        await super().on_startup(app)

        # Pass publishers to services
        my_service = MyAlertService(self.event_publisher)
        my_event_service = MyEventService(self.unified_publisher)

        # Store in app state if needed
        app.state.my_service = my_service
        app.state.my_event_service = my_event_service

    async def on_shutdown(self, app: FastAPI):
        if self.messaging_manager:
            await self.messaging_manager.cleanup()
        await super().on_shutdown(app)

In Service Implementation:

from shared.messaging import UnifiedEventPublisher

class MyEventService:
    def __init__(self, event_publisher: UnifiedEventPublisher):
        self.publisher = event_publisher

    async def handle_business_logic(self, tenant_id: UUID, data: Dict[str, Any]):
        # Publish business events
        await self.publisher.publish_business_event(
            event_type="mydomain.action_performed",
            tenant_id=tenant_id,
            data=data
        )

Migration Guide

Old Pattern (Deprecated):

# OLD - Don't use this anymore
from shared.alerts.base_service import BaseAlertService

class MyService(BaseAlertService):
    def __init__(self, config):
        super().__init__(config)

    async def send_alert(self, tenant_id, data):
        await self.publish_item(tenant_id, data, item_type="alert")
# NEW - Use UnifiedEventPublisher for all event types
from shared.messaging import UnifiedEventPublisher

class MyService:
    def __init__(self, event_publisher: UnifiedEventPublisher):
        self.publisher = event_publisher

    async def send_alert(self, tenant_id: UUID, data: Dict[str, Any]):
        await self.publisher.publish_alert(
            event_type="mydomain.alert_type",
            tenant_id=tenant_id,
            severity="high",
            data=data
        )

Event Routing

Events are routed using the following patterns:

  • Alerts: alert.{domain}.{severity} (e.g., alert.inventory.high)
  • Notifications: notification.{domain}.info (e.g., notification.production.info)
  • Recommendations: recommendation.{domain}.medium (e.g., recommendation.forecasting.medium)
  • Business Events: business.{event_type} (e.g., business.inventory_stock_added)

Best Practices

  1. Consistent Naming: Use lowercase, dot-separated event types (e.g., inventory.stock.added)
  2. Tenant Awareness: Always include tenant_id for multi-tenant operations
  3. Data Minimization: Include only essential data in events
  4. Error Handling: Always wrap event publishing in try-catch blocks
  5. Service Names: Use consistent service names matching your service definition
  6. Lifecycle Management: Always clean up messaging resources during service shutdown