536 lines
18 KiB
Python
536 lines
18 KiB
Python
"""
|
|
Sales Event Consumer
|
|
Processes sales transaction events from RabbitMQ and updates analytics
|
|
Handles completed sales and refunds from POS systems
|
|
"""
|
|
import json
|
|
import structlog
|
|
from typing import Dict, Any
|
|
from datetime import datetime, date
|
|
from decimal import Decimal
|
|
from collections import defaultdict
|
|
|
|
from shared.messaging import RabbitMQClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.dialects.postgresql import insert
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class SalesEventConsumer:
|
|
"""
|
|
Consumes sales transaction events and updates sales analytics
|
|
Processes events from POS consumer
|
|
"""
|
|
|
|
def __init__(self, db_session: AsyncSession):
|
|
self.db_session = db_session
|
|
|
|
async def consume_sales_events(
|
|
self,
|
|
rabbitmq_client: RabbitMQClient
|
|
):
|
|
"""
|
|
Start consuming sales events from RabbitMQ
|
|
"""
|
|
async def process_message(message):
|
|
"""Process a single sales event message"""
|
|
try:
|
|
async with message.process():
|
|
# Parse event data
|
|
event_data = json.loads(message.body.decode())
|
|
logger.info(
|
|
"Received sales event",
|
|
event_id=event_data.get('event_id'),
|
|
event_type=event_data.get('event_type'),
|
|
tenant_id=event_data.get('tenant_id')
|
|
)
|
|
|
|
# Process the event
|
|
await self.process_sales_event(event_data)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error processing sales event",
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
|
|
# Start consuming events
|
|
await rabbitmq_client.consume_events(
|
|
exchange_name="sales.events",
|
|
queue_name="sales.processing.queue",
|
|
routing_key="sales.transaction.*",
|
|
callback=process_message
|
|
)
|
|
|
|
logger.info("Started consuming sales events")
|
|
|
|
async def process_sales_event(self, event_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Process a sales event based on type
|
|
|
|
Args:
|
|
event_data: Full event payload from RabbitMQ
|
|
|
|
Returns:
|
|
bool: True if processed successfully
|
|
"""
|
|
try:
|
|
event_type = event_data.get('event_type')
|
|
data = event_data.get('data', {})
|
|
tenant_id = event_data.get('tenant_id')
|
|
|
|
if not tenant_id:
|
|
logger.warning("Sales event missing tenant_id", event_data=event_data)
|
|
return False
|
|
|
|
# Route to appropriate handler
|
|
if event_type == 'sales.transaction.completed':
|
|
success = await self._handle_transaction_completed(tenant_id, data)
|
|
elif event_type == 'sales.transaction.refunded':
|
|
success = await self._handle_transaction_refunded(tenant_id, data)
|
|
else:
|
|
logger.warning("Unknown sales event type", event_type=event_type)
|
|
success = True # Mark as processed to avoid retry
|
|
|
|
if success:
|
|
logger.info(
|
|
"Sales event processed successfully",
|
|
event_type=event_type,
|
|
tenant_id=tenant_id
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Sales event processing failed",
|
|
event_type=event_type,
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error in process_sales_event",
|
|
error=str(e),
|
|
event_id=event_data.get('event_id'),
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
async def _handle_transaction_completed(
|
|
self,
|
|
tenant_id: str,
|
|
data: Dict[str, Any]
|
|
) -> bool:
|
|
"""
|
|
Handle completed sale transaction
|
|
|
|
Updates:
|
|
- Daily sales analytics aggregates
|
|
- Revenue tracking
|
|
- Transaction counters
|
|
- Product sales tracking
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
data: Transaction data from event
|
|
|
|
Returns:
|
|
bool: True if handled successfully
|
|
"""
|
|
try:
|
|
transaction_id = data.get('transaction_id')
|
|
total_amount = Decimal(str(data.get('total_amount', 0)))
|
|
transaction_date_str = data.get('transaction_date')
|
|
items = data.get('items', [])
|
|
pos_system = data.get('pos_system', 'unknown')
|
|
|
|
if not transaction_id:
|
|
logger.warning("Transaction missing ID", data=data)
|
|
return False
|
|
|
|
# Parse transaction date
|
|
if transaction_date_str:
|
|
if isinstance(transaction_date_str, str):
|
|
transaction_date = datetime.fromisoformat(
|
|
transaction_date_str.replace('Z', '+00:00')
|
|
).date()
|
|
else:
|
|
transaction_date = datetime.utcnow().date()
|
|
else:
|
|
transaction_date = datetime.utcnow().date()
|
|
|
|
# Check for duplicate processing (idempotency)
|
|
# In production, would check a processed_transactions table
|
|
# For now, we rely on unique constraints in analytics table
|
|
|
|
# Update daily sales analytics
|
|
await self._update_daily_analytics(
|
|
tenant_id=tenant_id,
|
|
transaction_date=transaction_date,
|
|
revenue=total_amount,
|
|
transaction_count=1,
|
|
refund_amount=Decimal('0')
|
|
)
|
|
|
|
# Update product sales tracking
|
|
await self._update_product_sales(
|
|
tenant_id=tenant_id,
|
|
transaction_date=transaction_date,
|
|
items=items
|
|
)
|
|
|
|
# Store transaction record (optional detailed tracking)
|
|
await self._store_transaction_record(
|
|
tenant_id=tenant_id,
|
|
transaction_id=transaction_id,
|
|
transaction_date=transaction_date,
|
|
total_amount=total_amount,
|
|
items=items,
|
|
pos_system=pos_system,
|
|
transaction_type='sale'
|
|
)
|
|
|
|
logger.info(
|
|
"Transaction processed and analytics updated",
|
|
tenant_id=tenant_id,
|
|
transaction_id=transaction_id,
|
|
total_amount=float(total_amount),
|
|
date=str(transaction_date)
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error handling transaction completed",
|
|
error=str(e),
|
|
tenant_id=tenant_id,
|
|
transaction_id=data.get('transaction_id'),
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
async def _handle_transaction_refunded(
|
|
self,
|
|
tenant_id: str,
|
|
data: Dict[str, Any]
|
|
) -> bool:
|
|
"""
|
|
Handle refunded sale transaction
|
|
|
|
Updates:
|
|
- Daily sales analytics (negative revenue)
|
|
- Refund counters
|
|
- Product refund tracking
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
data: Refund data from event
|
|
|
|
Returns:
|
|
bool: True if handled successfully
|
|
"""
|
|
try:
|
|
refund_id = data.get('refund_id')
|
|
original_transaction_id = data.get('original_transaction_id')
|
|
refund_amount = Decimal(str(data.get('refund_amount', 0)))
|
|
refund_date_str = data.get('refund_date')
|
|
items = data.get('items', [])
|
|
pos_system = data.get('pos_system', 'unknown')
|
|
|
|
if not refund_id:
|
|
logger.warning("Refund missing ID", data=data)
|
|
return False
|
|
|
|
# Parse refund date
|
|
if refund_date_str:
|
|
if isinstance(refund_date_str, str):
|
|
refund_date = datetime.fromisoformat(
|
|
refund_date_str.replace('Z', '+00:00')
|
|
).date()
|
|
else:
|
|
refund_date = datetime.utcnow().date()
|
|
else:
|
|
refund_date = datetime.utcnow().date()
|
|
|
|
# Update daily sales analytics (subtract revenue, add refund)
|
|
await self._update_daily_analytics(
|
|
tenant_id=tenant_id,
|
|
transaction_date=refund_date,
|
|
revenue=-refund_amount, # Negative revenue
|
|
transaction_count=0, # Don't increment transaction count for refunds
|
|
refund_amount=refund_amount
|
|
)
|
|
|
|
# Update product refund tracking
|
|
await self._update_product_refunds(
|
|
tenant_id=tenant_id,
|
|
refund_date=refund_date,
|
|
items=items
|
|
)
|
|
|
|
# Store refund record
|
|
await self._store_transaction_record(
|
|
tenant_id=tenant_id,
|
|
transaction_id=refund_id,
|
|
transaction_date=refund_date,
|
|
total_amount=-refund_amount,
|
|
items=items,
|
|
pos_system=pos_system,
|
|
transaction_type='refund',
|
|
original_transaction_id=original_transaction_id
|
|
)
|
|
|
|
logger.info(
|
|
"Refund processed and analytics updated",
|
|
tenant_id=tenant_id,
|
|
refund_id=refund_id,
|
|
refund_amount=float(refund_amount),
|
|
date=str(refund_date)
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error handling transaction refunded",
|
|
error=str(e),
|
|
tenant_id=tenant_id,
|
|
refund_id=data.get('refund_id'),
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
async def _update_daily_analytics(
|
|
self,
|
|
tenant_id: str,
|
|
transaction_date: date,
|
|
revenue: Decimal,
|
|
transaction_count: int,
|
|
refund_amount: Decimal
|
|
):
|
|
"""
|
|
Update or create daily sales analytics record
|
|
|
|
Uses UPSERT (INSERT ... ON CONFLICT UPDATE) for atomic updates
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
transaction_date: Date of transaction
|
|
revenue: Revenue amount (negative for refunds)
|
|
transaction_count: Number of transactions
|
|
refund_amount: Refund amount
|
|
"""
|
|
try:
|
|
# Note: This assumes a sales_analytics table exists
|
|
# In production, ensure table is created via migration
|
|
from app.models.sales_analytics import SalesAnalytics
|
|
|
|
# Use PostgreSQL UPSERT for atomic updates
|
|
stmt = insert(SalesAnalytics).values(
|
|
tenant_id=tenant_id,
|
|
date=transaction_date,
|
|
total_revenue=revenue,
|
|
total_transactions=transaction_count,
|
|
total_refunds=refund_amount,
|
|
average_transaction_value=revenue if transaction_count > 0 else Decimal('0'),
|
|
updated_at=datetime.utcnow()
|
|
).on_conflict_do_update(
|
|
index_elements=['tenant_id', 'date'],
|
|
set_={
|
|
'total_revenue': SalesAnalytics.total_revenue + revenue,
|
|
'total_transactions': SalesAnalytics.total_transactions + transaction_count,
|
|
'total_refunds': SalesAnalytics.total_refunds + refund_amount,
|
|
'average_transaction_value': (
|
|
(SalesAnalytics.total_revenue + revenue) /
|
|
func.greatest(SalesAnalytics.total_transactions + transaction_count, 1)
|
|
),
|
|
'updated_at': datetime.utcnow()
|
|
}
|
|
)
|
|
|
|
await self.db_session.execute(stmt)
|
|
await self.db_session.commit()
|
|
|
|
logger.info(
|
|
"Daily analytics updated",
|
|
tenant_id=tenant_id,
|
|
date=str(transaction_date),
|
|
revenue_delta=float(revenue),
|
|
transaction_count_delta=transaction_count
|
|
)
|
|
|
|
except Exception as e:
|
|
await self.db_session.rollback()
|
|
logger.error(
|
|
"Failed to update daily analytics",
|
|
tenant_id=tenant_id,
|
|
date=str(transaction_date),
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def _update_product_sales(
|
|
self,
|
|
tenant_id: str,
|
|
transaction_date: date,
|
|
items: list
|
|
):
|
|
"""
|
|
Update product sales tracking
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
transaction_date: Date of transaction
|
|
items: List of items sold
|
|
"""
|
|
try:
|
|
# Aggregate items by product
|
|
product_sales = defaultdict(lambda: {'quantity': 0, 'revenue': Decimal('0')})
|
|
|
|
for item in items:
|
|
product_id = item.get('product_id')
|
|
if not product_id:
|
|
continue
|
|
|
|
quantity = item.get('quantity', 0)
|
|
unit_price = Decimal(str(item.get('unit_price', 0)))
|
|
revenue = quantity * unit_price
|
|
|
|
product_sales[product_id]['quantity'] += quantity
|
|
product_sales[product_id]['revenue'] += revenue
|
|
|
|
# Update each product's sales (would need product_sales table)
|
|
# For now, log the aggregation
|
|
logger.info(
|
|
"Product sales aggregated",
|
|
tenant_id=tenant_id,
|
|
date=str(transaction_date),
|
|
products_count=len(product_sales)
|
|
)
|
|
|
|
# In production, insert/update product_sales table here
|
|
# Similar UPSERT pattern as daily analytics
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to update product sales",
|
|
tenant_id=tenant_id,
|
|
error=str(e)
|
|
)
|
|
|
|
async def _update_product_refunds(
|
|
self,
|
|
tenant_id: str,
|
|
refund_date: date,
|
|
items: list
|
|
):
|
|
"""
|
|
Update product refund tracking
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
refund_date: Date of refund
|
|
items: List of items refunded
|
|
"""
|
|
try:
|
|
# Similar to product sales, but for refunds
|
|
product_refunds = defaultdict(lambda: {'quantity': 0, 'amount': Decimal('0')})
|
|
|
|
for item in items:
|
|
product_id = item.get('product_id')
|
|
if not product_id:
|
|
continue
|
|
|
|
quantity = item.get('quantity', 0)
|
|
unit_price = Decimal(str(item.get('unit_price', 0)))
|
|
amount = quantity * unit_price
|
|
|
|
product_refunds[product_id]['quantity'] += quantity
|
|
product_refunds[product_id]['amount'] += amount
|
|
|
|
logger.info(
|
|
"Product refunds aggregated",
|
|
tenant_id=tenant_id,
|
|
date=str(refund_date),
|
|
products_count=len(product_refunds)
|
|
)
|
|
|
|
# In production, update product_refunds table
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to update product refunds",
|
|
tenant_id=tenant_id,
|
|
error=str(e)
|
|
)
|
|
|
|
async def _store_transaction_record(
|
|
self,
|
|
tenant_id: str,
|
|
transaction_id: str,
|
|
transaction_date: date,
|
|
total_amount: Decimal,
|
|
items: list,
|
|
pos_system: str,
|
|
transaction_type: str,
|
|
original_transaction_id: str = None
|
|
):
|
|
"""
|
|
Store detailed transaction record
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
transaction_id: Transaction/refund ID
|
|
transaction_date: Date of transaction
|
|
total_amount: Total amount
|
|
items: Transaction items
|
|
pos_system: POS system name
|
|
transaction_type: 'sale' or 'refund'
|
|
original_transaction_id: For refunds, the original transaction ID
|
|
"""
|
|
try:
|
|
# Would store in transactions table for detailed tracking
|
|
# For now, just log
|
|
logger.info(
|
|
"Transaction record created",
|
|
tenant_id=tenant_id,
|
|
transaction_id=transaction_id,
|
|
type=transaction_type,
|
|
amount=float(total_amount),
|
|
items_count=len(items),
|
|
pos_system=pos_system
|
|
)
|
|
|
|
# In production, insert into transactions table:
|
|
# from app.models.transactions import Transaction
|
|
# transaction = Transaction(
|
|
# id=transaction_id,
|
|
# tenant_id=tenant_id,
|
|
# transaction_date=transaction_date,
|
|
# total_amount=total_amount,
|
|
# items=items,
|
|
# pos_system=pos_system,
|
|
# transaction_type=transaction_type,
|
|
# original_transaction_id=original_transaction_id
|
|
# )
|
|
# self.db_session.add(transaction)
|
|
# await self.db_session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to store transaction record",
|
|
transaction_id=transaction_id,
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
# Factory function for creating consumer instance
|
|
def create_sales_event_consumer(db_session: AsyncSession) -> SalesEventConsumer:
|
|
"""Create sales event consumer instance"""
|
|
return SalesEventConsumer(db_session)
|