""" Inventory Transfer Event Consumer Listens for completed internal transfers and handles inventory ownership transfer """ import asyncio import structlog from typing import Dict, Any import json from app.services.internal_transfer_service import InternalTransferInventoryService from shared.messaging import RabbitMQClient logger = structlog.get_logger() class InventoryTransferEventConsumer: """ Consumer for inventory transfer events triggered by internal transfers """ def __init__( self, internal_transfer_service: InternalTransferInventoryService, rabbitmq_client: RabbitMQClient ): self.internal_transfer_service = internal_transfer_service self.rabbitmq_client = rabbitmq_client self.is_running = False async def start_consuming(self): """ Start consuming inventory transfer events """ logger.info("Starting inventory transfer event consumer") self.is_running = True # Declare exchange and queue for internal transfer events await self.rabbitmq_client.declare_exchange("internal_transfers", "topic") await self.rabbitmq_client.declare_queue("inventory_service_internal_transfers") await self.rabbitmq_client.bind_queue_to_exchange( queue_name="inventory_service_internal_transfers", exchange_name="internal_transfers", routing_key="internal_transfer.completed" ) # Start consuming await self.rabbitmq_client.consume( queue_name="inventory_service_internal_transfers", callback=self.handle_internal_transfer_completed, auto_ack=False ) logger.info("Inventory transfer event consumer started") async def handle_internal_transfer_completed(self, message): """ Handle internal transfer completed event This means a shipment has been delivered and inventory ownership should transfer """ try: event_data = json.loads(message.body.decode()) logger.info("Processing internal transfer completed event", event_data=event_data) # Extract data from the event shipment_id = event_data.get('shipment_id') parent_tenant_id = event_data.get('parent_tenant_id') child_tenant_id = event_data.get('child_tenant_id') items = event_data.get('items', []) if not all([shipment_id, parent_tenant_id, child_tenant_id, items]): logger.error("Missing required data in internal transfer event", event_data=event_data) await message.nack(requeue=False) # Don't retry invalid messages return # Process the inventory transfer for each item transfer_results = [] errors = [] for item in items: product_id = item.get('product_id') delivered_quantity = item.get('delivered_quantity') if not all([product_id, delivered_quantity]): errors.append({ 'error': 'Missing product_id or delivered_quantity', 'item': item }) continue try: # Deduct from parent inventory await self._transfer_inventory_from_parent( parent_tenant_id=parent_tenant_id, product_id=product_id, quantity=delivered_quantity ) # Add to child inventory await self._transfer_inventory_to_child( child_tenant_id=child_tenant_id, product_id=product_id, quantity=delivered_quantity ) transfer_results.append({ 'product_id': product_id, 'quantity': delivered_quantity, 'status': 'completed' }) logger.info( "Inventory transferred successfully", parent_tenant_id=parent_tenant_id, child_tenant_id=child_tenant_id, product_id=product_id, quantity=delivered_quantity ) except Exception as item_error: logger.error( "Failed to transfer inventory for item", parent_tenant_id=parent_tenant_id, child_tenant_id=child_tenant_id, product_id=product_id, error=str(item_error) ) errors.append({ 'product_id': product_id, 'quantity': delivered_quantity, 'error': str(item_error) }) # Acknowledge message after processing await message.ack() logger.info( "Internal transfer processed", shipment_id=shipment_id, parent_tenant_id=parent_tenant_id, child_tenant_id=child_tenant_id, successful_transfers=len(transfer_results), failed_transfers=len(errors) ) except Exception as e: logger.error("Error processing internal transfer event", error=str(e), exc_info=True) # Nack with requeue=True to retry on transient errors await message.nack(requeue=True) async def _transfer_inventory_from_parent( self, parent_tenant_id: str, product_id: str, quantity: float ): """ Deduct inventory from parent tenant """ try: # Create stock movement to reduce parent inventory stock_movement_data = { "product_id": product_id, "movement_type": "internal_transfer_out", "quantity": -float(quantity), # Negative for outflow "reference_type": "internal_transfer", "reference_id": f"transfer_{parent_tenant_id}_to_{product_id}", # Would have actual transfer ID "source_tenant_id": parent_tenant_id, "destination_tenant_id": None, # Will be set when we know the child "notes": f"Internal transfer to child tenant" } # Call inventory service to process the movement await self.internal_transfer_service.inventory_client.create_stock_movement( tenant_id=parent_tenant_id, movement_data=stock_movement_data ) logger.info( "Inventory deducted from parent tenant", parent_tenant_id=parent_tenant_id, product_id=product_id, quantity=quantity ) except Exception as e: logger.error( "Error deducting inventory from parent", parent_tenant_id=parent_tenant_id, product_id=product_id, error=str(e) ) raise async def _transfer_inventory_to_child( self, child_tenant_id: str, product_id: str, quantity: float ): """ Add inventory to child tenant """ try: # Create stock movement to increase child inventory stock_movement_data = { "product_id": product_id, "movement_type": "internal_transfer_in", "quantity": float(quantity), # Positive for inflow "reference_type": "internal_transfer", "reference_id": f"transfer_from_parent_{product_id}_to_{child_tenant_id}", # Would have actual transfer ID "source_tenant_id": None, # Will be set when we know the parent "destination_tenant_id": child_tenant_id, "notes": f"Internal transfer from parent tenant" } # Call inventory service to process the movement await self.internal_transfer_service.inventory_client.create_stock_movement( tenant_id=child_tenant_id, movement_data=stock_movement_data ) logger.info( "Inventory added to child tenant", child_tenant_id=child_tenant_id, product_id=product_id, quantity=quantity ) except Exception as e: logger.error( "Error adding inventory to child", child_tenant_id=child_tenant_id, product_id=product_id, error=str(e) ) raise async def stop_consuming(self): """ Stop consuming inventory transfer events """ logger.info("Stopping inventory transfer event consumer") self.is_running = False # In a real implementation, we would close the RabbitMQ connection logger.info("Inventory transfer event consumer stopped") async def health_check(self) -> Dict[str, Any]: """ Health check for the consumer """ return { "consumer": "inventory_transfer_event_consumer", "status": "running" if self.is_running else "stopped", "timestamp": datetime.utcnow().isoformat() }