""" Sales Event Consumer Processes sales transaction events from RabbitMQ and updates analytics Handles completed sales and refunds from POS systems """ import json import structlog from typing import Dict, Any from datetime import datetime, date from decimal import Decimal from collections import defaultdict from shared.messaging import RabbitMQClient from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from sqlalchemy.dialects.postgresql import insert logger = structlog.get_logger() class SalesEventConsumer: """ Consumes sales transaction events and updates sales analytics Processes events from POS consumer """ def __init__(self, db_session: AsyncSession): self.db_session = db_session async def consume_sales_events( self, rabbitmq_client: RabbitMQClient ): """ Start consuming sales events from RabbitMQ """ async def process_message(message): """Process a single sales event message""" try: async with message.process(): # Parse event data event_data = json.loads(message.body.decode()) logger.info( "Received sales event", event_id=event_data.get('event_id'), event_type=event_data.get('event_type'), tenant_id=event_data.get('tenant_id') ) # Process the event await self.process_sales_event(event_data) except Exception as e: logger.error( "Error processing sales event", error=str(e), exc_info=True ) # Start consuming events await rabbitmq_client.consume_events( exchange_name="sales.events", queue_name="sales.processing.queue", routing_key="sales.transaction.*", callback=process_message ) logger.info("Started consuming sales events") async def process_sales_event(self, event_data: Dict[str, Any]) -> bool: """ Process a sales event based on type Args: event_data: Full event payload from RabbitMQ Returns: bool: True if processed successfully """ try: event_type = event_data.get('event_type') data = event_data.get('data', {}) tenant_id = event_data.get('tenant_id') if not tenant_id: logger.warning("Sales event missing tenant_id", event_data=event_data) return False # Route to appropriate handler if event_type == 'sales.transaction.completed': success = await self._handle_transaction_completed(tenant_id, data) elif event_type == 'sales.transaction.refunded': success = await self._handle_transaction_refunded(tenant_id, data) else: logger.warning("Unknown sales event type", event_type=event_type) success = True # Mark as processed to avoid retry if success: logger.info( "Sales event processed successfully", event_type=event_type, tenant_id=tenant_id ) else: logger.error( "Sales event processing failed", event_type=event_type, tenant_id=tenant_id ) return success except Exception as e: logger.error( "Error in process_sales_event", error=str(e), event_id=event_data.get('event_id'), exc_info=True ) return False async def _handle_transaction_completed( self, tenant_id: str, data: Dict[str, Any] ) -> bool: """ Handle completed sale transaction Updates: - Daily sales analytics aggregates - Revenue tracking - Transaction counters - Product sales tracking Args: tenant_id: Tenant ID data: Transaction data from event Returns: bool: True if handled successfully """ try: transaction_id = data.get('transaction_id') total_amount = Decimal(str(data.get('total_amount', 0))) transaction_date_str = data.get('transaction_date') items = data.get('items', []) pos_system = data.get('pos_system', 'unknown') if not transaction_id: logger.warning("Transaction missing ID", data=data) return False # Parse transaction date if transaction_date_str: if isinstance(transaction_date_str, str): transaction_date = datetime.fromisoformat( transaction_date_str.replace('Z', '+00:00') ).date() else: transaction_date = datetime.utcnow().date() else: transaction_date = datetime.utcnow().date() # Check for duplicate processing (idempotency) # In production, would check a processed_transactions table # For now, we rely on unique constraints in analytics table # Update daily sales analytics await self._update_daily_analytics( tenant_id=tenant_id, transaction_date=transaction_date, revenue=total_amount, transaction_count=1, refund_amount=Decimal('0') ) # Update product sales tracking await self._update_product_sales( tenant_id=tenant_id, transaction_date=transaction_date, items=items ) # Store transaction record (optional detailed tracking) await self._store_transaction_record( tenant_id=tenant_id, transaction_id=transaction_id, transaction_date=transaction_date, total_amount=total_amount, items=items, pos_system=pos_system, transaction_type='sale' ) logger.info( "Transaction processed and analytics updated", tenant_id=tenant_id, transaction_id=transaction_id, total_amount=float(total_amount), date=str(transaction_date) ) return True except Exception as e: logger.error( "Error handling transaction completed", error=str(e), tenant_id=tenant_id, transaction_id=data.get('transaction_id'), exc_info=True ) return False async def _handle_transaction_refunded( self, tenant_id: str, data: Dict[str, Any] ) -> bool: """ Handle refunded sale transaction Updates: - Daily sales analytics (negative revenue) - Refund counters - Product refund tracking Args: tenant_id: Tenant ID data: Refund data from event Returns: bool: True if handled successfully """ try: refund_id = data.get('refund_id') original_transaction_id = data.get('original_transaction_id') refund_amount = Decimal(str(data.get('refund_amount', 0))) refund_date_str = data.get('refund_date') items = data.get('items', []) pos_system = data.get('pos_system', 'unknown') if not refund_id: logger.warning("Refund missing ID", data=data) return False # Parse refund date if refund_date_str: if isinstance(refund_date_str, str): refund_date = datetime.fromisoformat( refund_date_str.replace('Z', '+00:00') ).date() else: refund_date = datetime.utcnow().date() else: refund_date = datetime.utcnow().date() # Update daily sales analytics (subtract revenue, add refund) await self._update_daily_analytics( tenant_id=tenant_id, transaction_date=refund_date, revenue=-refund_amount, # Negative revenue transaction_count=0, # Don't increment transaction count for refunds refund_amount=refund_amount ) # Update product refund tracking await self._update_product_refunds( tenant_id=tenant_id, refund_date=refund_date, items=items ) # Store refund record await self._store_transaction_record( tenant_id=tenant_id, transaction_id=refund_id, transaction_date=refund_date, total_amount=-refund_amount, items=items, pos_system=pos_system, transaction_type='refund', original_transaction_id=original_transaction_id ) logger.info( "Refund processed and analytics updated", tenant_id=tenant_id, refund_id=refund_id, refund_amount=float(refund_amount), date=str(refund_date) ) return True except Exception as e: logger.error( "Error handling transaction refunded", error=str(e), tenant_id=tenant_id, refund_id=data.get('refund_id'), exc_info=True ) return False async def _update_daily_analytics( self, tenant_id: str, transaction_date: date, revenue: Decimal, transaction_count: int, refund_amount: Decimal ): """ Update or create daily sales analytics record Uses UPSERT (INSERT ... ON CONFLICT UPDATE) for atomic updates Args: tenant_id: Tenant ID transaction_date: Date of transaction revenue: Revenue amount (negative for refunds) transaction_count: Number of transactions refund_amount: Refund amount """ try: # Note: This assumes a sales_analytics table exists # In production, ensure table is created via migration from app.models.sales_analytics import SalesAnalytics # Use PostgreSQL UPSERT for atomic updates stmt = insert(SalesAnalytics).values( tenant_id=tenant_id, date=transaction_date, total_revenue=revenue, total_transactions=transaction_count, total_refunds=refund_amount, average_transaction_value=revenue if transaction_count > 0 else Decimal('0'), updated_at=datetime.utcnow() ).on_conflict_do_update( index_elements=['tenant_id', 'date'], set_={ 'total_revenue': SalesAnalytics.total_revenue + revenue, 'total_transactions': SalesAnalytics.total_transactions + transaction_count, 'total_refunds': SalesAnalytics.total_refunds + refund_amount, 'average_transaction_value': ( (SalesAnalytics.total_revenue + revenue) / func.greatest(SalesAnalytics.total_transactions + transaction_count, 1) ), 'updated_at': datetime.utcnow() } ) await self.db_session.execute(stmt) await self.db_session.commit() logger.info( "Daily analytics updated", tenant_id=tenant_id, date=str(transaction_date), revenue_delta=float(revenue), transaction_count_delta=transaction_count ) except Exception as e: await self.db_session.rollback() logger.error( "Failed to update daily analytics", tenant_id=tenant_id, date=str(transaction_date), error=str(e), exc_info=True ) raise async def _update_product_sales( self, tenant_id: str, transaction_date: date, items: list ): """ Update product sales tracking Args: tenant_id: Tenant ID transaction_date: Date of transaction items: List of items sold """ try: # Aggregate items by product product_sales = defaultdict(lambda: {'quantity': 0, 'revenue': Decimal('0')}) for item in items: product_id = item.get('product_id') if not product_id: continue quantity = item.get('quantity', 0) unit_price = Decimal(str(item.get('unit_price', 0))) revenue = quantity * unit_price product_sales[product_id]['quantity'] += quantity product_sales[product_id]['revenue'] += revenue # Update each product's sales (would need product_sales table) # For now, log the aggregation logger.info( "Product sales aggregated", tenant_id=tenant_id, date=str(transaction_date), products_count=len(product_sales) ) # In production, insert/update product_sales table here # Similar UPSERT pattern as daily analytics except Exception as e: logger.error( "Failed to update product sales", tenant_id=tenant_id, error=str(e) ) async def _update_product_refunds( self, tenant_id: str, refund_date: date, items: list ): """ Update product refund tracking Args: tenant_id: Tenant ID refund_date: Date of refund items: List of items refunded """ try: # Similar to product sales, but for refunds product_refunds = defaultdict(lambda: {'quantity': 0, 'amount': Decimal('0')}) for item in items: product_id = item.get('product_id') if not product_id: continue quantity = item.get('quantity', 0) unit_price = Decimal(str(item.get('unit_price', 0))) amount = quantity * unit_price product_refunds[product_id]['quantity'] += quantity product_refunds[product_id]['amount'] += amount logger.info( "Product refunds aggregated", tenant_id=tenant_id, date=str(refund_date), products_count=len(product_refunds) ) # In production, update product_refunds table except Exception as e: logger.error( "Failed to update product refunds", tenant_id=tenant_id, error=str(e) ) async def _store_transaction_record( self, tenant_id: str, transaction_id: str, transaction_date: date, total_amount: Decimal, items: list, pos_system: str, transaction_type: str, original_transaction_id: str = None ): """ Store detailed transaction record Args: tenant_id: Tenant ID transaction_id: Transaction/refund ID transaction_date: Date of transaction total_amount: Total amount items: Transaction items pos_system: POS system name transaction_type: 'sale' or 'refund' original_transaction_id: For refunds, the original transaction ID """ try: # Would store in transactions table for detailed tracking # For now, just log logger.info( "Transaction record created", tenant_id=tenant_id, transaction_id=transaction_id, type=transaction_type, amount=float(total_amount), items_count=len(items), pos_system=pos_system ) # In production, insert into transactions table: # from app.models.transactions import Transaction # transaction = Transaction( # id=transaction_id, # tenant_id=tenant_id, # transaction_date=transaction_date, # total_amount=total_amount, # items=items, # pos_system=pos_system, # transaction_type=transaction_type, # original_transaction_id=original_transaction_id # ) # self.db_session.add(transaction) # await self.db_session.commit() except Exception as e: logger.error( "Failed to store transaction record", transaction_id=transaction_id, error=str(e) ) # Factory function for creating consumer instance def create_sales_event_consumer(db_session: AsyncSession) -> SalesEventConsumer: """Create sales event consumer instance""" return SalesEventConsumer(db_session)