""" Forecast event consumer for the forecasting service Handles events that should trigger cache invalidation for aggregated forecasts """ import logging from typing import Dict, Any, Optional import json import redis.asyncio as redis logger = logging.getLogger(__name__) class ForecastEventConsumer: """ Consumer for forecast events that may trigger cache invalidation """ def __init__(self, redis_client: redis.Redis): self.redis_client = redis_client async def handle_forecast_updated(self, event_data: Dict[str, Any]): """ Handle forecast updated event Invalidate parent tenant's aggregated forecast cache if this tenant is a child """ try: logger.info(f"Handling forecast updated event: {event_data}") tenant_id = event_data.get('tenant_id') forecast_date = event_data.get('forecast_date') product_id = event_data.get('product_id') updated_at = event_data.get('updated_at', None) if not tenant_id: logger.error("Missing tenant_id in forecast event") return # Check if this tenant is a child tenant (has parent) # In a real implementation, this would call the tenant service to check hierarchy parent_tenant_id = await self._get_parent_tenant_id(tenant_id) if parent_tenant_id: # Invalidate parent's aggregated forecast cache await self._invalidate_parent_aggregated_cache( parent_tenant_id=parent_tenant_id, child_tenant_id=tenant_id, forecast_date=forecast_date, product_id=product_id ) logger.info(f"Forecast updated event processed for tenant {tenant_id}") except Exception as e: logger.error(f"Error handling forecast updated event: {e}", exc_info=True) raise async def handle_forecast_created(self, event_data: Dict[str, Any]): """ Handle forecast created event Similar to update, may affect parent tenant's aggregated forecasts """ await self.handle_forecast_updated(event_data) async def handle_forecast_deleted(self, event_data: Dict[str, Any]): """ Handle forecast deleted event Similar to update, may affect parent tenant's aggregated forecasts """ try: logger.info(f"Handling forecast deleted event: {event_data}") tenant_id = event_data.get('tenant_id') forecast_date = event_data.get('forecast_date') product_id = event_data.get('product_id') if not tenant_id: logger.error("Missing tenant_id in forecast delete event") return # Check if this tenant is a child tenant parent_tenant_id = await self._get_parent_tenant_id(tenant_id) if parent_tenant_id: # Invalidate parent's aggregated forecast cache await self._invalidate_parent_aggregated_cache( parent_tenant_id=parent_tenant_id, child_tenant_id=tenant_id, forecast_date=forecast_date, product_id=product_id ) logger.info(f"Forecast deleted event processed for tenant {tenant_id}") except Exception as e: logger.error(f"Error handling forecast deleted event: {e}", exc_info=True) raise async def _get_parent_tenant_id(self, tenant_id: str) -> Optional[str]: """ Get parent tenant ID for a child tenant using the tenant service """ try: from shared.clients.tenant_client import TenantServiceClient from shared.config.base import get_settings # Create tenant client config = get_settings() tenant_client = TenantServiceClient(config) # Get parent tenant information parent_tenant = await tenant_client.get_parent_tenant(tenant_id) if parent_tenant: parent_tenant_id = parent_tenant.get('id') logger.info(f"Found parent tenant {parent_tenant_id} for child tenant {tenant_id}") return parent_tenant_id else: logger.debug(f"No parent tenant found for tenant {tenant_id} (tenant may be standalone or parent)") return None except Exception as e: logger.error(f"Error getting parent tenant ID for {tenant_id}: {e}") return None async def _invalidate_parent_aggregated_cache( self, parent_tenant_id: str, child_tenant_id: str, forecast_date: Optional[str] = None, product_id: Optional[str] = None ): """ Invalidate parent tenant's aggregated forecast cache """ try: # Pattern to match all aggregated forecast cache keys for this parent # Format: agg_forecast:{parent_tenant_id}:{start_date}:{end_date}:{product_id} pattern = f"agg_forecast:{parent_tenant_id}:*:*:*" # Find all matching keys and delete them keys_to_delete = [] async for key in self.redis_client.scan_iter(match=pattern): if isinstance(key, bytes): key = key.decode('utf-8') keys_to_delete.append(key) if keys_to_delete: await self.redis_client.delete(*keys_to_delete) logger.info(f"Invalidated {len(keys_to_delete)} aggregated forecast cache entries for parent tenant {parent_tenant_id}") else: logger.info(f"No aggregated forecast cache entries found to invalidate for parent tenant {parent_tenant_id}") except Exception as e: logger.error(f"Error invalidating parent aggregated cache: {e}", exc_info=True) raise async def handle_tenant_hierarchy_changed(self, event_data: Dict[str, Any]): """ Handle tenant hierarchy change event This could be when a tenant becomes a child of another, or when the hierarchy changes """ try: logger.info(f"Handling tenant hierarchy change event: {event_data}") tenant_id = event_data.get('tenant_id') parent_tenant_id = event_data.get('parent_tenant_id') action = event_data.get('action') # 'added', 'removed', 'changed' # Invalidate any cached aggregated forecasts that might be affected if parent_tenant_id: # If this child tenant changed, invalidate parent's cache await self._invalidate_parent_aggregated_cache( parent_tenant_id=parent_tenant_id, child_tenant_id=tenant_id ) # If this was a former parent tenant that's no longer a parent, # its aggregated cache might need to be invalidated differently if action == 'removed' and event_data.get('was_parent'): # Invalidate its own aggregated cache since it's no longer a parent # This would be handled by tenant service events pass except Exception as e: logger.error(f"Error handling tenant hierarchy change event: {e}", exc_info=True) raise