232 lines
8.9 KiB
Python
232 lines
8.9 KiB
Python
# services/sales/app/services/messaging.py
|
|
"""
|
|
Sales Service Messaging - Event Publishing using shared messaging infrastructure
|
|
"""
|
|
|
|
import structlog
|
|
from typing import Dict, Any, Optional
|
|
from uuid import UUID
|
|
from datetime import datetime
|
|
|
|
from shared.messaging.rabbitmq import RabbitMQClient
|
|
from shared.messaging.events import BaseEvent, DataImportedEvent
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class SalesEventPublisher:
|
|
"""Sales service event publisher using RabbitMQ"""
|
|
|
|
def __init__(self):
|
|
self.enabled = True
|
|
self._rabbitmq_client = None
|
|
|
|
async def _get_rabbitmq_client(self):
|
|
"""Get or create RabbitMQ client"""
|
|
if not self._rabbitmq_client:
|
|
self._rabbitmq_client = RabbitMQClient(
|
|
connection_url=settings.RABBITMQ_URL,
|
|
service_name="sales-service"
|
|
)
|
|
await self._rabbitmq_client.connect()
|
|
return self._rabbitmq_client
|
|
|
|
async def publish_sales_created(self, sales_data: Dict[str, Any], correlation_id: Optional[str] = None) -> bool:
|
|
"""Publish sales created event"""
|
|
try:
|
|
if not self.enabled:
|
|
return True
|
|
|
|
# Create event
|
|
event = BaseEvent(
|
|
service_name="sales-service",
|
|
data={
|
|
"record_id": str(sales_data.get("id")),
|
|
"tenant_id": str(sales_data.get("tenant_id")),
|
|
"product_name": sales_data.get("product_name"),
|
|
"revenue": float(sales_data.get("revenue", 0)),
|
|
"quantity_sold": sales_data.get("quantity_sold", 0),
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
event_type="sales.created",
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
# Publish via RabbitMQ
|
|
client = await self._get_rabbitmq_client()
|
|
success = await client.publish_event(
|
|
exchange_name="sales.events",
|
|
routing_key="sales.created",
|
|
event_data=event.to_dict()
|
|
)
|
|
|
|
if success:
|
|
logger.info("Sales record created event published",
|
|
record_id=sales_data.get("id"),
|
|
tenant_id=sales_data.get("tenant_id"),
|
|
product=sales_data.get("product_name"))
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to publish sales created event", error=str(e))
|
|
return False
|
|
|
|
async def publish_sales_updated(self, sales_data: Dict[str, Any], correlation_id: Optional[str] = None) -> bool:
|
|
"""Publish sales updated event"""
|
|
try:
|
|
if not self.enabled:
|
|
return True
|
|
|
|
event = BaseEvent(
|
|
service_name="sales-service",
|
|
data={
|
|
"record_id": str(sales_data.get("id")),
|
|
"tenant_id": str(sales_data.get("tenant_id")),
|
|
"product_name": sales_data.get("product_name"),
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
event_type="sales.updated",
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
client = await self._get_rabbitmq_client()
|
|
success = await client.publish_event(
|
|
exchange_name="sales.events",
|
|
routing_key="sales.updated",
|
|
event_data=event.to_dict()
|
|
)
|
|
|
|
if success:
|
|
logger.info("Sales record updated event published",
|
|
record_id=sales_data.get("id"),
|
|
tenant_id=sales_data.get("tenant_id"))
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to publish sales updated event", error=str(e))
|
|
return False
|
|
|
|
async def publish_sales_deleted(self, record_id: UUID, tenant_id: UUID, correlation_id: Optional[str] = None) -> bool:
|
|
"""Publish sales deleted event"""
|
|
try:
|
|
if not self.enabled:
|
|
return True
|
|
|
|
event = BaseEvent(
|
|
service_name="sales-service",
|
|
data={
|
|
"record_id": str(record_id),
|
|
"tenant_id": str(tenant_id),
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
event_type="sales.deleted",
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
client = await self._get_rabbitmq_client()
|
|
success = await client.publish_event(
|
|
exchange_name="sales.events",
|
|
routing_key="sales.deleted",
|
|
event_data=event.to_dict()
|
|
)
|
|
|
|
if success:
|
|
logger.info("Sales record deleted event published",
|
|
record_id=record_id,
|
|
tenant_id=tenant_id)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to publish sales deleted event", error=str(e))
|
|
return False
|
|
|
|
async def publish_data_imported(self, import_result: Dict[str, Any], correlation_id: Optional[str] = None) -> bool:
|
|
"""Publish data imported event"""
|
|
try:
|
|
if not self.enabled:
|
|
return True
|
|
|
|
event = DataImportedEvent(
|
|
service_name="sales-service",
|
|
data={
|
|
"records_created": import_result.get("records_created", 0),
|
|
"records_updated": import_result.get("records_updated", 0),
|
|
"records_failed": import_result.get("records_failed", 0),
|
|
"tenant_id": str(import_result.get("tenant_id")),
|
|
"success": import_result.get("success", False),
|
|
"file_name": import_result.get("file_name"),
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
client = await self._get_rabbitmq_client()
|
|
success = await client.publish_event(
|
|
exchange_name="data.events",
|
|
routing_key="data.imported",
|
|
event_data=event.to_dict()
|
|
)
|
|
|
|
if success:
|
|
logger.info("Sales data imported event published",
|
|
records_created=import_result.get("records_created"),
|
|
tenant_id=import_result.get("tenant_id"),
|
|
success=import_result.get("success"))
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to publish data imported event", error=str(e))
|
|
return False
|
|
|
|
async def publish_analytics_generated(self, analytics_data: Dict[str, Any], correlation_id: Optional[str] = None) -> bool:
|
|
"""Publish analytics generated event"""
|
|
try:
|
|
if not self.enabled:
|
|
return True
|
|
|
|
event = BaseEvent(
|
|
service_name="sales-service",
|
|
data={
|
|
"tenant_id": str(analytics_data.get("tenant_id")),
|
|
"total_revenue": float(analytics_data.get("total_revenue", 0)),
|
|
"total_quantity": analytics_data.get("total_quantity", 0),
|
|
"total_transactions": analytics_data.get("total_transactions", 0),
|
|
"period_start": analytics_data.get("period_start"),
|
|
"period_end": analytics_data.get("period_end"),
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
event_type="analytics.generated",
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
client = await self._get_rabbitmq_client()
|
|
success = await client.publish_event(
|
|
exchange_name="analytics.events",
|
|
routing_key="analytics.generated",
|
|
event_data=event.to_dict()
|
|
)
|
|
|
|
if success:
|
|
logger.info("Sales analytics generated event published",
|
|
tenant_id=analytics_data.get("tenant_id"),
|
|
total_revenue=analytics_data.get("total_revenue"))
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to publish analytics generated event", error=str(e))
|
|
return False
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup RabbitMQ connections"""
|
|
if self._rabbitmq_client:
|
|
await self._rabbitmq_client.disconnect()
|
|
|
|
|
|
# Global instance
|
|
sales_publisher = SalesEventPublisher() |