276 lines
8.8 KiB
Python
276 lines
8.8 KiB
Python
"""
|
|
Procurement Service Event Publisher
|
|
Publishes procurement-related events to RabbitMQ
|
|
"""
|
|
import uuid
|
|
from typing import Optional, Dict, Any
|
|
from decimal import Decimal
|
|
import structlog
|
|
from shared.messaging.rabbitmq import RabbitMQClient
|
|
from shared.messaging.events import (
|
|
PurchaseOrderApprovedEvent,
|
|
PurchaseOrderRejectedEvent,
|
|
PurchaseOrderSentToSupplierEvent,
|
|
DeliveryReceivedEvent
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class ProcurementEventPublisher:
|
|
"""Handles publishing of procurement-related events"""
|
|
|
|
def __init__(self, rabbitmq_client: Optional[RabbitMQClient] = None):
|
|
self.rabbitmq_client = rabbitmq_client
|
|
self.service_name = "procurement"
|
|
|
|
async def publish_po_approved_event(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
po_id: uuid.UUID,
|
|
po_number: str,
|
|
supplier_id: uuid.UUID,
|
|
supplier_name: str,
|
|
supplier_email: Optional[str],
|
|
supplier_phone: Optional[str],
|
|
total_amount: Decimal,
|
|
currency: str,
|
|
required_delivery_date: Optional[str],
|
|
items: list,
|
|
approved_by: Optional[uuid.UUID],
|
|
approved_at: str,
|
|
correlation_id: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Publish purchase order approved event
|
|
|
|
This event triggers:
|
|
- Email/WhatsApp notification to supplier (notification service)
|
|
- Dashboard refresh (frontend)
|
|
- Analytics update (reporting service)
|
|
"""
|
|
if not self.rabbitmq_client:
|
|
logger.warning("RabbitMQ client not available, event not published", event="po.approved")
|
|
return False
|
|
|
|
event_data = {
|
|
"tenant_id": str(tenant_id),
|
|
"po_id": str(po_id),
|
|
"po_number": po_number,
|
|
"supplier_id": str(supplier_id),
|
|
"supplier_name": supplier_name,
|
|
"supplier_email": supplier_email,
|
|
"supplier_phone": supplier_phone,
|
|
"total_amount": float(total_amount),
|
|
"currency": currency,
|
|
"required_delivery_date": required_delivery_date,
|
|
"items": [
|
|
{
|
|
"inventory_product_id": str(item.get("inventory_product_id")),
|
|
"product_name": item.get("product_name"),
|
|
"ordered_quantity": float(item.get("ordered_quantity")),
|
|
"unit_of_measure": item.get("unit_of_measure"),
|
|
"unit_price": float(item.get("unit_price")),
|
|
"line_total": float(item.get("line_total"))
|
|
}
|
|
for item in items
|
|
],
|
|
"approved_by": str(approved_by) if approved_by else None,
|
|
"approved_at": approved_at,
|
|
}
|
|
|
|
event = PurchaseOrderApprovedEvent(
|
|
service_name=self.service_name,
|
|
data=event_data,
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
# Publish to procurement.events exchange with routing key po.approved
|
|
success = await self.rabbitmq_client.publish_event(
|
|
exchange_name="procurement.events",
|
|
routing_key="po.approved",
|
|
event_data=event.to_dict(),
|
|
persistent=True
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Published PO approved event",
|
|
tenant_id=str(tenant_id),
|
|
po_id=str(po_id),
|
|
po_number=po_number,
|
|
supplier_name=supplier_name
|
|
)
|
|
|
|
return success
|
|
|
|
async def publish_po_rejected_event(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
po_id: uuid.UUID,
|
|
po_number: str,
|
|
supplier_id: uuid.UUID,
|
|
supplier_name: str,
|
|
rejection_reason: str,
|
|
rejected_by: Optional[uuid.UUID],
|
|
rejected_at: str,
|
|
correlation_id: Optional[str] = None
|
|
) -> bool:
|
|
"""Publish purchase order rejected event"""
|
|
if not self.rabbitmq_client:
|
|
logger.warning("RabbitMQ client not available, event not published", event="po.rejected")
|
|
return False
|
|
|
|
event_data = {
|
|
"tenant_id": str(tenant_id),
|
|
"po_id": str(po_id),
|
|
"po_number": po_number,
|
|
"supplier_id": str(supplier_id),
|
|
"supplier_name": supplier_name,
|
|
"rejection_reason": rejection_reason,
|
|
"rejected_by": str(rejected_by) if rejected_by else None,
|
|
"rejected_at": rejected_at,
|
|
}
|
|
|
|
event = PurchaseOrderRejectedEvent(
|
|
service_name=self.service_name,
|
|
data=event_data,
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
success = await self.rabbitmq_client.publish_event(
|
|
exchange_name="procurement.events",
|
|
routing_key="po.rejected",
|
|
event_data=event.to_dict(),
|
|
persistent=True
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Published PO rejected event",
|
|
tenant_id=str(tenant_id),
|
|
po_id=str(po_id),
|
|
po_number=po_number
|
|
)
|
|
|
|
return success
|
|
|
|
async def publish_po_sent_to_supplier_event(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
po_id: uuid.UUID,
|
|
po_number: str,
|
|
supplier_id: uuid.UUID,
|
|
supplier_name: str,
|
|
supplier_email: Optional[str],
|
|
supplier_phone: Optional[str],
|
|
total_amount: Decimal,
|
|
currency: str,
|
|
sent_at: str,
|
|
correlation_id: Optional[str] = None
|
|
) -> bool:
|
|
"""Publish purchase order sent to supplier event"""
|
|
if not self.rabbitmq_client:
|
|
logger.warning("RabbitMQ client not available, event not published", event="po.sent_to_supplier")
|
|
return False
|
|
|
|
event_data = {
|
|
"tenant_id": str(tenant_id),
|
|
"po_id": str(po_id),
|
|
"po_number": po_number,
|
|
"supplier_id": str(supplier_id),
|
|
"supplier_name": supplier_name,
|
|
"supplier_email": supplier_email,
|
|
"supplier_phone": supplier_phone,
|
|
"total_amount": float(total_amount),
|
|
"currency": currency,
|
|
"sent_at": sent_at,
|
|
}
|
|
|
|
event = PurchaseOrderSentToSupplierEvent(
|
|
service_name=self.service_name,
|
|
data=event_data,
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
success = await self.rabbitmq_client.publish_event(
|
|
exchange_name="procurement.events",
|
|
routing_key="po.sent_to_supplier",
|
|
event_data=event.to_dict(),
|
|
persistent=True
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Published PO sent to supplier event",
|
|
tenant_id=str(tenant_id),
|
|
po_id=str(po_id),
|
|
po_number=po_number
|
|
)
|
|
|
|
return success
|
|
|
|
async def publish_delivery_received_event(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
delivery_id: uuid.UUID,
|
|
po_id: uuid.UUID,
|
|
items: list,
|
|
received_at: str,
|
|
received_by: Optional[uuid.UUID],
|
|
correlation_id: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Publish delivery received event
|
|
|
|
This event triggers:
|
|
- Automatic stock update (inventory service)
|
|
- PO status update to 'completed'
|
|
- Supplier performance metrics update
|
|
"""
|
|
if not self.rabbitmq_client:
|
|
logger.warning("RabbitMQ client not available, event not published", event="delivery.received")
|
|
return False
|
|
|
|
event_data = {
|
|
"tenant_id": str(tenant_id),
|
|
"delivery_id": str(delivery_id),
|
|
"po_id": str(po_id),
|
|
"items": [
|
|
{
|
|
"inventory_product_id": str(item.get("inventory_product_id")),
|
|
"accepted_quantity": float(item.get("accepted_quantity")),
|
|
"rejected_quantity": float(item.get("rejected_quantity", 0)),
|
|
"batch_lot_number": item.get("batch_lot_number"),
|
|
"expiry_date": item.get("expiry_date"),
|
|
"unit_of_measure": item.get("unit_of_measure")
|
|
}
|
|
for item in items
|
|
],
|
|
"received_at": received_at,
|
|
"received_by": str(received_by) if received_by else None,
|
|
}
|
|
|
|
event = DeliveryReceivedEvent(
|
|
service_name=self.service_name,
|
|
data=event_data,
|
|
correlation_id=correlation_id
|
|
)
|
|
|
|
success = await self.rabbitmq_client.publish_event(
|
|
exchange_name="procurement.events",
|
|
routing_key="delivery.received",
|
|
event_data=event.to_dict(),
|
|
persistent=True
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Published delivery received event",
|
|
tenant_id=str(tenant_id),
|
|
delivery_id=str(delivery_id),
|
|
po_id=str(po_id)
|
|
)
|
|
|
|
return success
|