""" POS Transaction Repository using Repository Pattern """ from typing import List, Optional, Dict, Any from uuid import UUID from datetime import datetime, date, timedelta from sqlalchemy import select, func, and_, or_, desc from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload import structlog from app.models.pos_transaction import POSTransaction, POSTransactionItem from shared.database.repository import BaseRepository logger = structlog.get_logger() class POSTransactionRepository(BaseRepository[POSTransaction, dict, dict]): """Repository for POS transaction operations""" def __init__(self, session: AsyncSession): super().__init__(POSTransaction, session) async def get_transactions_by_tenant( self, tenant_id: UUID, pos_system: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, status: Optional[str] = None, is_synced: Optional[bool] = None, skip: int = 0, limit: int = 50 ) -> List[POSTransaction]: """Get POS transactions for a specific tenant with optional filters""" try: query = select(self.model).options( selectinload(POSTransaction.items) ).where(self.model.tenant_id == tenant_id) # Apply filters conditions = [] if pos_system: conditions.append(self.model.pos_system == pos_system) if status: conditions.append(self.model.status == status) if is_synced is not None: conditions.append(self.model.is_synced_to_sales == is_synced) if start_date: conditions.append(self.model.transaction_date >= start_date) if end_date: conditions.append(self.model.transaction_date <= end_date) if conditions: query = query.where(and_(*conditions)) query = query.order_by(desc(self.model.transaction_date)).offset(skip).limit(limit) result = await self.session.execute(query) return result.scalars().all() except Exception as e: logger.error("Failed to get transactions by tenant", error=str(e), tenant_id=tenant_id) raise async def count_transactions_by_tenant( self, tenant_id: UUID, pos_system: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, status: Optional[str] = None, is_synced: Optional[bool] = None ) -> int: """Count POS transactions for a specific tenant with optional filters""" try: query = select(func.count(self.model.id)).where(self.model.tenant_id == tenant_id) # Apply filters conditions = [] if pos_system: conditions.append(self.model.pos_system == pos_system) if status: conditions.append(self.model.status == status) if is_synced is not None: conditions.append(self.model.is_synced_to_sales == is_synced) if start_date: conditions.append(self.model.transaction_date >= start_date) if end_date: conditions.append(self.model.transaction_date <= end_date) if conditions: query = query.where(and_(*conditions)) result = await self.session.execute(query) count = result.scalar() or 0 return count except Exception as e: logger.error("Failed to count transactions by tenant", error=str(e), tenant_id=tenant_id) raise async def get_transaction_with_items( self, transaction_id: UUID, tenant_id: UUID ) -> Optional[POSTransaction]: """Get transaction with all its items""" try: query = select(POSTransaction).options( selectinload(POSTransaction.items) ).where( and_( POSTransaction.id == transaction_id, POSTransaction.tenant_id == tenant_id ) ) result = await self.session.execute(query) return result.scalar_one_or_none() except Exception as e: logger.error("Failed to get transaction with items", transaction_id=str(transaction_id), error=str(e)) raise async def get_transactions_by_pos_config( self, pos_config_id: UUID, skip: int = 0, limit: int = 50 ) -> List[POSTransaction]: """Get transactions for a specific POS configuration""" try: query = select(POSTransaction).options( selectinload(POSTransaction.items) ).where( POSTransaction.pos_config_id == pos_config_id ).order_by(desc(POSTransaction.transaction_date)).offset(skip).limit(limit) result = await self.session.execute(query) return result.scalars().all() except Exception as e: logger.error("Failed to get transactions by pos config", pos_config_id=str(pos_config_id), error=str(e)) raise async def get_transactions_by_date_range( self, tenant_id: UUID, start_date: date, end_date: date, skip: int = 0, limit: int = 100 ) -> List[POSTransaction]: """Get transactions within date range""" try: start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) query = select(POSTransaction).options( selectinload(POSTransaction.items) ).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= start_datetime, POSTransaction.transaction_date <= end_datetime ) ).order_by(desc(POSTransaction.transaction_date)).offset(skip).limit(limit) result = await self.session.execute(query) return result.scalars().all() except Exception as e: logger.error("Failed to get transactions by date range", start_date=str(start_date), end_date=str(end_date), error=str(e)) raise async def get_dashboard_metrics( self, tenant_id: UUID ) -> Dict[str, Any]: """Get dashboard metrics for transactions""" try: # Today's metrics today = datetime.now().date() today_start = datetime.combine(today, datetime.min.time()) today_end = datetime.combine(today, datetime.max.time()) week_start = today - timedelta(days=today.weekday()) week_start_datetime = datetime.combine(week_start, datetime.min.time()) month_start = today.replace(day=1) month_start_datetime = datetime.combine(month_start, datetime.min.time()) # Transaction counts by period transactions_today = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= today_start, POSTransaction.transaction_date <= today_end, POSTransaction.status == "completed" ) ) ) transactions_week = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= week_start_datetime, POSTransaction.status == "completed" ) ) ) transactions_month = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= month_start_datetime, POSTransaction.status == "completed" ) ) ) # Revenue by period revenue_today = await self.session.execute( select(func.coalesce(func.sum(POSTransaction.total_amount), 0)).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= today_start, POSTransaction.transaction_date <= today_end, POSTransaction.status == "completed" ) ) ) revenue_week = await self.session.execute( select(func.coalesce(func.sum(POSTransaction.total_amount), 0)).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= week_start_datetime, POSTransaction.status == "completed" ) ) ) revenue_month = await self.session.execute( select(func.coalesce(func.sum(POSTransaction.total_amount), 0)).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.transaction_date >= month_start_datetime, POSTransaction.status == "completed" ) ) ) # Status breakdown status_counts = await self.session.execute( select(POSTransaction.status, func.count()).select_from(POSTransaction).where( POSTransaction.tenant_id == tenant_id ).group_by(POSTransaction.status) ) status_breakdown = {status: count for status, count in status_counts.fetchall()} # Payment method breakdown payment_counts = await self.session.execute( select(POSTransaction.payment_method, func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.status == "completed" ) ).group_by(POSTransaction.payment_method) ) payment_breakdown = {method: count for method, count in payment_counts.fetchall()} # Average transaction value avg_transaction_value = await self.session.execute( select(func.coalesce(func.avg(POSTransaction.total_amount), 0)).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.status == "completed" ) ) ) return { "total_transactions_today": transactions_today.scalar(), "total_transactions_this_week": transactions_week.scalar(), "total_transactions_this_month": transactions_month.scalar(), "revenue_today": float(revenue_today.scalar()), "revenue_this_week": float(revenue_week.scalar()), "revenue_this_month": float(revenue_month.scalar()), "status_breakdown": status_breakdown, "payment_method_breakdown": payment_breakdown, "average_transaction_value": float(avg_transaction_value.scalar()) } except Exception as e: logger.error("Failed to get dashboard metrics", error=str(e), tenant_id=tenant_id) raise async def get_sync_status_summary( self, tenant_id: UUID ) -> Dict[str, Any]: """Get sync status summary for transactions""" try: # Count synced vs unsynced synced_count = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.is_synced_to_sales == True ) ) ) pending_count = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.is_synced_to_sales == False, POSTransaction.sync_error.is_(None) ) ) ) failed_count = await self.session.execute( select(func.count()).select_from(POSTransaction).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.is_synced_to_sales == False, POSTransaction.sync_error.isnot(None) ) ) ) # Get last sync time last_sync = await self.session.execute( select(func.max(POSTransaction.sync_completed_at)).where( and_( POSTransaction.tenant_id == tenant_id, POSTransaction.is_synced_to_sales == True ) ) ) return { "synced": synced_count.scalar(), "pending": pending_count.scalar(), "failed": failed_count.scalar(), "last_sync_at": last_sync.scalar() } except Exception as e: logger.error("Failed to get sync status summary", error=str(e), tenant_id=tenant_id) raise