Files
bakery-ia/services/distribution/app/consumers/production_event_consumer.py

86 lines
3.6 KiB
Python
Raw Permalink Normal View History

2025-11-30 09:12:40 +01:00
"""
Production event consumer for the distribution service
Listens for production completion events and triggers distribution planning
"""
import logging
from typing import Dict, Any, Optional
import json
from app.services.distribution_service import DistributionService
logger = logging.getLogger(__name__)
class ProductionEventConsumer:
"""
Consumer for production events that may trigger distribution planning
"""
def __init__(self, distribution_service: DistributionService):
self.distribution_service = distribution_service
async def handle_production_batch_completed(self, event_data: Dict[str, Any]):
"""
Handle production batch completion event
This might trigger distribution planning if it's for internal transfers
"""
try:
logger.info(f"Handling production batch completion: {event_data}")
tenant_id = event_data.get('tenant_id')
batch_id = event_data.get('batch_id')
product_type = event_data.get('product_type')
completion_date = event_data.get('completion_date')
if not tenant_id:
logger.error("Missing tenant_id in production event")
return
# Check if this batch is for internal transfers (has destination tenant info)
# In a real implementation, this would check if the production batch
# is associated with an internal purchase order
# For now, we'll just log the event
logger.info(f"Production batch {batch_id} completed for tenant {tenant_id}")
# In a real implementation, this might trigger immediate distribution planning
# if the batch was for internal transfer orders
# await self._trigger_distribution_if_needed(tenant_id, batch_id)
except Exception as e:
logger.error(f"Error handling production batch completion event: {e}", exc_info=True)
raise
async def handle_internal_transfer_approved(self, event_data: Dict[str, Any]):
"""
Handle internal transfer approval event
This should trigger immediate distribution planning for the approved transfer
"""
try:
logger.info(f"Handling internal transfer approval: {event_data}")
tenant_id = event_data.get('tenant_id') # The parent tenant
transfer_id = event_data.get('transfer_id')
destination_tenant_id = event_data.get('destination_tenant_id')
scheduled_date = event_data.get('scheduled_date')
if not all([tenant_id, transfer_id, destination_tenant_id, scheduled_date]):
logger.error("Missing required fields in internal transfer event")
return
# In a real implementation, this might schedule distribution planning
# for the specific transfer on the scheduled date
logger.info(f"Internal transfer {transfer_id} approved from {tenant_id} to {destination_tenant_id}")
except Exception as e:
logger.error(f"Error handling internal transfer approval: {e}", exc_info=True)
raise
async def _trigger_distribution_if_needed(self, tenant_id: str, batch_id: str):
"""
Internal method to check if distribution planning is needed for this batch
"""
# Implementation would check if the batch is for internal transfers
# and trigger distribution planning if so
pass