""" Delivery Event Consumer Listens for delivery.received events from procurement service and automatically updates inventory stock levels. """ import json import uuid from datetime import datetime, timezone from typing import Dict, Any from decimal import Decimal import structlog from shared.messaging import RabbitMQClient from app.core.database import database_manager from app.repositories.stock_repository import StockRepository from app.repositories.stock_movement_repository import StockMovementRepository logger = structlog.get_logger() class DeliveryEventConsumer: """ Consumes delivery.received events and updates inventory stock. When a delivery is recorded in procurement service: 1. Listens for delivery.received event 2. Creates stock entries for each delivered item 3. Updates stock levels (quantity_available) 4. Records batch numbers and expiry dates """ def __init__(self): """Initialize delivery event consumer""" self.service_name = "inventory" async def consume_delivery_received_events( self, rabbitmq_client: RabbitMQClient ): """ Start consuming delivery.received events from RabbitMQ Args: rabbitmq_client: RabbitMQ client instance """ async def process_message(message): """Process a single delivery.received event message""" try: async with message.process(): # Parse event data event_data = json.loads(message.body.decode()) logger.info( "Received delivery.received event", event_id=event_data.get('event_id'), delivery_id=event_data.get('data', {}).get('delivery_id') ) # Process the delivery and update stock success = await self.process_delivery_stock_update(event_data) if success: logger.info( "Successfully processed delivery stock update", delivery_id=event_data.get('data', {}).get('delivery_id') ) else: logger.error( "Failed to process delivery stock update", delivery_id=event_data.get('data', {}).get('delivery_id') ) except Exception as e: logger.error( "Error processing delivery.received event", error=str(e), exc_info=True ) # Start consuming events await rabbitmq_client.consume_events( exchange_name="procurement.events", queue_name="inventory.delivery.received", routing_key="delivery.received", callback=process_message ) logger.info("Started consuming delivery.received events") async def process_delivery_stock_update(self, event_data: Dict[str, Any]) -> bool: """ Process delivery event and update stock levels. Args: event_data: Full event payload from RabbitMQ Returns: bool: True if stock updated successfully """ try: data = event_data.get('data', {}) # Extract delivery information tenant_id = uuid.UUID(data.get('tenant_id')) delivery_id = uuid.UUID(data.get('delivery_id')) po_id = uuid.UUID(data.get('po_id')) items = data.get('items', []) received_by = data.get('received_by') received_at = data.get('received_at') if not items: logger.warning( "No items in delivery event, skipping stock update", delivery_id=str(delivery_id) ) return False # Process each item async with database_manager.get_session() as session: stock_repo = StockRepository(session) movement_repo = StockMovementRepository(session) for item in items: try: # inventory_product_id is the same as ingredient_id # The ingredients table serves as a unified catalog for both raw materials and products ingredient_id = uuid.UUID(item.get('inventory_product_id')) accepted_quantity = Decimal(str(item.get('accepted_quantity', 0))) # Only process if quantity was accepted if accepted_quantity <= 0: logger.debug( "Skipping item with zero accepted quantity", ingredient_id=str(ingredient_id) ) continue # Create a new stock batch entry for this delivery # The Stock model uses batch tracking - each delivery creates a new batch entry # Extract unit cost from delivery item unit_cost = Decimal('0') try: if 'unit_cost' in item: unit_cost = Decimal(str(item['unit_cost'])) elif 'unit_price' in item: unit_cost = Decimal(str(item['unit_price'])) elif 'price' in item: unit_cost = Decimal(str(item['price'])) except (ValueError, TypeError, KeyError) as e: logger.warning("Could not extract unit cost from delivery item for stock entry", item_id=item.get('id'), error=str(e)) # Calculate total cost total_cost = unit_cost * accepted_quantity stock_data = { 'tenant_id': tenant_id, 'ingredient_id': ingredient_id, 'batch_number': item.get('batch_lot_number'), 'lot_number': item.get('batch_lot_number'), # Use same as batch_number 'supplier_batch_ref': item.get('batch_lot_number'), # Quantities 'current_quantity': float(accepted_quantity), 'reserved_quantity': 0.0, 'available_quantity': float(accepted_quantity), # Dates 'received_date': datetime.fromisoformat(received_at.replace('Z', '+00:00')) if received_at else datetime.now(timezone.utc), 'expiration_date': datetime.fromisoformat(item.get('expiry_date').replace('Z', '+00:00')) if item.get('expiry_date') else None, # Cost - extracted from delivery item 'unit_cost': unit_cost, 'total_cost': total_cost, # Production stage - default to raw ingredient for deliveries 'production_stage': 'raw_ingredient', # Status 'is_available': True, 'quality_status': 'GOOD' } from app.schemas.inventory import StockCreate stock_create = StockCreate(**stock_data) stock = await stock_repo.create_stock_entry(stock_create, tenant_id) logger.info( "Created new stock batch from delivery", ingredient_id=str(ingredient_id), stock_id=str(stock.id), batch_number=item.get('batch_lot_number'), quantity=float(accepted_quantity), delivery_id=str(delivery_id) ) # Create stock movement record for audit trail from app.models.inventory import StockMovementType from app.schemas.inventory import StockMovementCreate # Extract unit cost from delivery item or default to 0 unit_cost = Decimal('0') try: if 'unit_cost' in item: unit_cost = Decimal(str(item['unit_cost'])) elif 'unit_price' in item: unit_cost = Decimal(str(item['unit_price'])) elif 'price' in item: unit_cost = Decimal(str(item['price'])) except (ValueError, TypeError, KeyError) as e: logger.warning("Could not extract unit cost from delivery item", item_id=item.get('id'), error=str(e)) movement_data = StockMovementCreate( ingredient_id=ingredient_id, stock_id=stock.id, movement_type=StockMovementType.PURCHASE, quantity=float(accepted_quantity), unit_cost=unit_cost, reference_number=f"DEL-{delivery_id}", reason_code='delivery', notes=f"Delivery received from PO {po_id}. Batch: {item.get('batch_lot_number', 'N/A')}", movement_date=datetime.fromisoformat(received_at.replace('Z', '+00:00')) if received_at else datetime.now(timezone.utc) ) movement = await movement_repo.create_movement( movement_data=movement_data, tenant_id=tenant_id, created_by=uuid.UUID(received_by) if received_by else None, quantity_before=0.0, # New batch starts at 0 quantity_after=float(accepted_quantity) ) logger.info( "Created stock movement for delivery", movement_id=str(movement.id), ingredient_id=str(ingredient_id), quantity=float(accepted_quantity), batch=item.get('batch_lot_number') ) except Exception as item_error: logger.error( "Error processing delivery item", error=str(item_error), item=item, exc_info=True ) # Continue processing other items even if one fails continue # Commit all changes await session.commit() logger.info( "Successfully processed delivery stock update", delivery_id=str(delivery_id), items_processed=len(items) ) return True except Exception as e: logger.error( "Error in delivery stock update", error=str(e), delivery_id=data.get('delivery_id'), exc_info=True ) return False