Files
bakery-ia/services/sales/app/consumers/sales_event_consumer.py
2025-12-05 20:07:01 +01:00

536 lines
18 KiB
Python

"""
Sales Event Consumer
Processes sales transaction events from RabbitMQ and updates analytics
Handles completed sales and refunds from POS systems
"""
import json
import structlog
from typing import Dict, Any
from datetime import datetime, date
from decimal import Decimal
from collections import defaultdict
from shared.messaging import RabbitMQClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.dialects.postgresql import insert
logger = structlog.get_logger()
class SalesEventConsumer:
"""
Consumes sales transaction events and updates sales analytics
Processes events from POS consumer
"""
def __init__(self, db_session: AsyncSession):
self.db_session = db_session
async def consume_sales_events(
self,
rabbitmq_client: RabbitMQClient
):
"""
Start consuming sales events from RabbitMQ
"""
async def process_message(message):
"""Process a single sales event message"""
try:
async with message.process():
# Parse event data
event_data = json.loads(message.body.decode())
logger.info(
"Received sales event",
event_id=event_data.get('event_id'),
event_type=event_data.get('event_type'),
tenant_id=event_data.get('tenant_id')
)
# Process the event
await self.process_sales_event(event_data)
except Exception as e:
logger.error(
"Error processing sales event",
error=str(e),
exc_info=True
)
# Start consuming events
await rabbitmq_client.consume_events(
exchange_name="sales.events",
queue_name="sales.processing.queue",
routing_key="sales.transaction.*",
callback=process_message
)
logger.info("Started consuming sales events")
async def process_sales_event(self, event_data: Dict[str, Any]) -> bool:
"""
Process a sales event based on type
Args:
event_data: Full event payload from RabbitMQ
Returns:
bool: True if processed successfully
"""
try:
event_type = event_data.get('event_type')
data = event_data.get('data', {})
tenant_id = event_data.get('tenant_id')
if not tenant_id:
logger.warning("Sales event missing tenant_id", event_data=event_data)
return False
# Route to appropriate handler
if event_type == 'sales.transaction.completed':
success = await self._handle_transaction_completed(tenant_id, data)
elif event_type == 'sales.transaction.refunded':
success = await self._handle_transaction_refunded(tenant_id, data)
else:
logger.warning("Unknown sales event type", event_type=event_type)
success = True # Mark as processed to avoid retry
if success:
logger.info(
"Sales event processed successfully",
event_type=event_type,
tenant_id=tenant_id
)
else:
logger.error(
"Sales event processing failed",
event_type=event_type,
tenant_id=tenant_id
)
return success
except Exception as e:
logger.error(
"Error in process_sales_event",
error=str(e),
event_id=event_data.get('event_id'),
exc_info=True
)
return False
async def _handle_transaction_completed(
self,
tenant_id: str,
data: Dict[str, Any]
) -> bool:
"""
Handle completed sale transaction
Updates:
- Daily sales analytics aggregates
- Revenue tracking
- Transaction counters
- Product sales tracking
Args:
tenant_id: Tenant ID
data: Transaction data from event
Returns:
bool: True if handled successfully
"""
try:
transaction_id = data.get('transaction_id')
total_amount = Decimal(str(data.get('total_amount', 0)))
transaction_date_str = data.get('transaction_date')
items = data.get('items', [])
pos_system = data.get('pos_system', 'unknown')
if not transaction_id:
logger.warning("Transaction missing ID", data=data)
return False
# Parse transaction date
if transaction_date_str:
if isinstance(transaction_date_str, str):
transaction_date = datetime.fromisoformat(
transaction_date_str.replace('Z', '+00:00')
).date()
else:
transaction_date = datetime.utcnow().date()
else:
transaction_date = datetime.utcnow().date()
# Check for duplicate processing (idempotency)
# In production, would check a processed_transactions table
# For now, we rely on unique constraints in analytics table
# Update daily sales analytics
await self._update_daily_analytics(
tenant_id=tenant_id,
transaction_date=transaction_date,
revenue=total_amount,
transaction_count=1,
refund_amount=Decimal('0')
)
# Update product sales tracking
await self._update_product_sales(
tenant_id=tenant_id,
transaction_date=transaction_date,
items=items
)
# Store transaction record (optional detailed tracking)
await self._store_transaction_record(
tenant_id=tenant_id,
transaction_id=transaction_id,
transaction_date=transaction_date,
total_amount=total_amount,
items=items,
pos_system=pos_system,
transaction_type='sale'
)
logger.info(
"Transaction processed and analytics updated",
tenant_id=tenant_id,
transaction_id=transaction_id,
total_amount=float(total_amount),
date=str(transaction_date)
)
return True
except Exception as e:
logger.error(
"Error handling transaction completed",
error=str(e),
tenant_id=tenant_id,
transaction_id=data.get('transaction_id'),
exc_info=True
)
return False
async def _handle_transaction_refunded(
self,
tenant_id: str,
data: Dict[str, Any]
) -> bool:
"""
Handle refunded sale transaction
Updates:
- Daily sales analytics (negative revenue)
- Refund counters
- Product refund tracking
Args:
tenant_id: Tenant ID
data: Refund data from event
Returns:
bool: True if handled successfully
"""
try:
refund_id = data.get('refund_id')
original_transaction_id = data.get('original_transaction_id')
refund_amount = Decimal(str(data.get('refund_amount', 0)))
refund_date_str = data.get('refund_date')
items = data.get('items', [])
pos_system = data.get('pos_system', 'unknown')
if not refund_id:
logger.warning("Refund missing ID", data=data)
return False
# Parse refund date
if refund_date_str:
if isinstance(refund_date_str, str):
refund_date = datetime.fromisoformat(
refund_date_str.replace('Z', '+00:00')
).date()
else:
refund_date = datetime.utcnow().date()
else:
refund_date = datetime.utcnow().date()
# Update daily sales analytics (subtract revenue, add refund)
await self._update_daily_analytics(
tenant_id=tenant_id,
transaction_date=refund_date,
revenue=-refund_amount, # Negative revenue
transaction_count=0, # Don't increment transaction count for refunds
refund_amount=refund_amount
)
# Update product refund tracking
await self._update_product_refunds(
tenant_id=tenant_id,
refund_date=refund_date,
items=items
)
# Store refund record
await self._store_transaction_record(
tenant_id=tenant_id,
transaction_id=refund_id,
transaction_date=refund_date,
total_amount=-refund_amount,
items=items,
pos_system=pos_system,
transaction_type='refund',
original_transaction_id=original_transaction_id
)
logger.info(
"Refund processed and analytics updated",
tenant_id=tenant_id,
refund_id=refund_id,
refund_amount=float(refund_amount),
date=str(refund_date)
)
return True
except Exception as e:
logger.error(
"Error handling transaction refunded",
error=str(e),
tenant_id=tenant_id,
refund_id=data.get('refund_id'),
exc_info=True
)
return False
async def _update_daily_analytics(
self,
tenant_id: str,
transaction_date: date,
revenue: Decimal,
transaction_count: int,
refund_amount: Decimal
):
"""
Update or create daily sales analytics record
Uses UPSERT (INSERT ... ON CONFLICT UPDATE) for atomic updates
Args:
tenant_id: Tenant ID
transaction_date: Date of transaction
revenue: Revenue amount (negative for refunds)
transaction_count: Number of transactions
refund_amount: Refund amount
"""
try:
# Note: This assumes a sales_analytics table exists
# In production, ensure table is created via migration
from app.models.sales_analytics import SalesAnalytics
# Use PostgreSQL UPSERT for atomic updates
stmt = insert(SalesAnalytics).values(
tenant_id=tenant_id,
date=transaction_date,
total_revenue=revenue,
total_transactions=transaction_count,
total_refunds=refund_amount,
average_transaction_value=revenue if transaction_count > 0 else Decimal('0'),
updated_at=datetime.utcnow()
).on_conflict_do_update(
index_elements=['tenant_id', 'date'],
set_={
'total_revenue': SalesAnalytics.total_revenue + revenue,
'total_transactions': SalesAnalytics.total_transactions + transaction_count,
'total_refunds': SalesAnalytics.total_refunds + refund_amount,
'average_transaction_value': (
(SalesAnalytics.total_revenue + revenue) /
func.greatest(SalesAnalytics.total_transactions + transaction_count, 1)
),
'updated_at': datetime.utcnow()
}
)
await self.db_session.execute(stmt)
await self.db_session.commit()
logger.info(
"Daily analytics updated",
tenant_id=tenant_id,
date=str(transaction_date),
revenue_delta=float(revenue),
transaction_count_delta=transaction_count
)
except Exception as e:
await self.db_session.rollback()
logger.error(
"Failed to update daily analytics",
tenant_id=tenant_id,
date=str(transaction_date),
error=str(e),
exc_info=True
)
raise
async def _update_product_sales(
self,
tenant_id: str,
transaction_date: date,
items: list
):
"""
Update product sales tracking
Args:
tenant_id: Tenant ID
transaction_date: Date of transaction
items: List of items sold
"""
try:
# Aggregate items by product
product_sales = defaultdict(lambda: {'quantity': 0, 'revenue': Decimal('0')})
for item in items:
product_id = item.get('product_id')
if not product_id:
continue
quantity = item.get('quantity', 0)
unit_price = Decimal(str(item.get('unit_price', 0)))
revenue = quantity * unit_price
product_sales[product_id]['quantity'] += quantity
product_sales[product_id]['revenue'] += revenue
# Update each product's sales (would need product_sales table)
# For now, log the aggregation
logger.info(
"Product sales aggregated",
tenant_id=tenant_id,
date=str(transaction_date),
products_count=len(product_sales)
)
# In production, insert/update product_sales table here
# Similar UPSERT pattern as daily analytics
except Exception as e:
logger.error(
"Failed to update product sales",
tenant_id=tenant_id,
error=str(e)
)
async def _update_product_refunds(
self,
tenant_id: str,
refund_date: date,
items: list
):
"""
Update product refund tracking
Args:
tenant_id: Tenant ID
refund_date: Date of refund
items: List of items refunded
"""
try:
# Similar to product sales, but for refunds
product_refunds = defaultdict(lambda: {'quantity': 0, 'amount': Decimal('0')})
for item in items:
product_id = item.get('product_id')
if not product_id:
continue
quantity = item.get('quantity', 0)
unit_price = Decimal(str(item.get('unit_price', 0)))
amount = quantity * unit_price
product_refunds[product_id]['quantity'] += quantity
product_refunds[product_id]['amount'] += amount
logger.info(
"Product refunds aggregated",
tenant_id=tenant_id,
date=str(refund_date),
products_count=len(product_refunds)
)
# In production, update product_refunds table
except Exception as e:
logger.error(
"Failed to update product refunds",
tenant_id=tenant_id,
error=str(e)
)
async def _store_transaction_record(
self,
tenant_id: str,
transaction_id: str,
transaction_date: date,
total_amount: Decimal,
items: list,
pos_system: str,
transaction_type: str,
original_transaction_id: str = None
):
"""
Store detailed transaction record
Args:
tenant_id: Tenant ID
transaction_id: Transaction/refund ID
transaction_date: Date of transaction
total_amount: Total amount
items: Transaction items
pos_system: POS system name
transaction_type: 'sale' or 'refund'
original_transaction_id: For refunds, the original transaction ID
"""
try:
# Would store in transactions table for detailed tracking
# For now, just log
logger.info(
"Transaction record created",
tenant_id=tenant_id,
transaction_id=transaction_id,
type=transaction_type,
amount=float(total_amount),
items_count=len(items),
pos_system=pos_system
)
# In production, insert into transactions table:
# from app.models.transactions import Transaction
# transaction = Transaction(
# id=transaction_id,
# tenant_id=tenant_id,
# transaction_date=transaction_date,
# total_amount=total_amount,
# items=items,
# pos_system=pos_system,
# transaction_type=transaction_type,
# original_transaction_id=original_transaction_id
# )
# self.db_session.add(transaction)
# await self.db_session.commit()
except Exception as e:
logger.error(
"Failed to store transaction record",
transaction_id=transaction_id,
error=str(e)
)
# Factory function for creating consumer instance
def create_sales_event_consumer(db_session: AsyncSession) -> SalesEventConsumer:
"""Create sales event consumer instance"""
return SalesEventConsumer(db_session)