""" POS Sync Service - Business Logic Layer Handles sync job creation, tracking, and metrics """ from typing import Optional, List, Dict, Any from uuid import UUID, uuid4 from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, desc import structlog from app.models.pos_sync import POSSyncLog from app.core.database import get_db_transaction logger = structlog.get_logger() class POSSyncService: """Service layer for POS sync operations""" def __init__(self, db: Optional[AsyncSession] = None): self.db = db async def create_sync_job( self, tenant_id: UUID, pos_config_id: UUID, pos_system: str, sync_type: str = "manual", data_types: List[str] = None ) -> POSSyncLog: """ Create a new sync job Args: tenant_id: Tenant UUID pos_config_id: POS configuration UUID pos_system: POS system name sync_type: Type of sync (manual, scheduled, incremental, full) data_types: List of data types to sync Returns: Created sync log """ try: async with get_db_transaction() as db: sync_log = POSSyncLog( tenant_id=tenant_id, pos_config_id=pos_config_id, pos_system=pos_system, sync_type=sync_type, sync_direction="inbound", data_type=",".join(data_types) if data_types else "transactions", status="started", started_at=datetime.utcnow(), triggered_by="user" ) db.add(sync_log) await db.commit() await db.refresh(sync_log) logger.info("Sync job created", sync_id=str(sync_log.id), tenant_id=str(tenant_id), pos_system=pos_system, sync_type=sync_type) return sync_log except Exception as e: logger.error("Failed to create sync job", error=str(e)) raise async def get_sync_by_id(self, sync_id: UUID) -> Optional[POSSyncLog]: """Get sync log by ID""" try: async with get_db_transaction() as db: return await db.get(POSSyncLog, sync_id) except Exception as e: logger.error("Failed to get sync log", error=str(e), sync_id=str(sync_id)) raise async def update_sync_status( self, sync_id: UUID, status: str, error_message: Optional[str] = None, stats: Optional[Dict[str, int]] = None ) -> None: """Update sync job status""" try: async with get_db_transaction() as db: sync_log = await db.get(POSSyncLog, sync_id) if sync_log: sync_log.status = status sync_log.completed_at = datetime.utcnow() if sync_log.started_at: duration = (datetime.utcnow() - sync_log.started_at).total_seconds() sync_log.duration_seconds = duration if error_message: sync_log.error_message = error_message if stats: sync_log.records_processed = stats.get("processed", 0) sync_log.records_created = stats.get("created", 0) sync_log.records_updated = stats.get("updated", 0) sync_log.records_failed = stats.get("failed", 0) await db.commit() logger.info("Sync status updated", sync_id=str(sync_id), status=status) except Exception as e: logger.error("Failed to update sync status", error=str(e)) raise async def get_sync_logs( self, tenant_id: UUID, config_id: Optional[UUID] = None, status: Optional[str] = None, sync_type: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> Dict[str, Any]: """ Get sync logs with filtering Returns: Dict with logs and pagination info """ try: async with get_db_transaction() as db: query = select(POSSyncLog).where(POSSyncLog.tenant_id == tenant_id) # Apply filters if config_id: query = query.where(POSSyncLog.pos_config_id == config_id) if status: query = query.where(POSSyncLog.status == status) if sync_type: query = query.where(POSSyncLog.sync_type == sync_type) # Get total count count_query = select(func.count()).select_from(query.subquery()) result = await db.execute(count_query) total = result.scalar() or 0 # Get paginated results query = query.order_by(desc(POSSyncLog.started_at)).offset(offset).limit(limit) result = await db.execute(query) logs = result.scalars().all() return { "logs": [self._sync_log_to_dict(log) for log in logs], "total": total, "has_more": offset + len(logs) < total } except Exception as e: logger.error("Failed to get sync logs", error=str(e)) raise async def calculate_average_duration( self, tenant_id: UUID, pos_config_id: Optional[UUID] = None, days: int = 30 ) -> float: """ Calculate average sync duration for recent successful syncs Args: tenant_id: Tenant UUID pos_config_id: Optional POS config filter days: Number of days to look back Returns: Average duration in minutes """ try: async with get_db_transaction() as db: cutoff_date = datetime.utcnow() - timedelta(days=days) query = select(func.avg(POSSyncLog.duration_seconds)).where( and_( POSSyncLog.tenant_id == tenant_id, POSSyncLog.status == "completed", POSSyncLog.started_at >= cutoff_date, POSSyncLog.duration_seconds.isnot(None) ) ) if pos_config_id: query = query.where(POSSyncLog.pos_config_id == pos_config_id) result = await db.execute(query) avg_seconds = result.scalar() if avg_seconds: return round(float(avg_seconds) / 60, 2) # Convert to minutes else: return 0.0 except Exception as e: logger.error("Failed to calculate average duration", error=str(e)) return 0.0 def _sync_log_to_dict(self, sync_log: POSSyncLog) -> Dict[str, Any]: """Convert sync log to dictionary""" return { "id": str(sync_log.id), "tenant_id": str(sync_log.tenant_id), "pos_config_id": str(sync_log.pos_config_id), "pos_system": sync_log.pos_system, "sync_type": sync_log.sync_type, "data_type": sync_log.data_type, "status": sync_log.status, "started_at": sync_log.started_at.isoformat() if sync_log.started_at else None, "completed_at": sync_log.completed_at.isoformat() if sync_log.completed_at else None, "duration_seconds": float(sync_log.duration_seconds) if sync_log.duration_seconds else None, "records_processed": sync_log.records_processed, "records_created": sync_log.records_created, "records_updated": sync_log.records_updated, "records_failed": sync_log.records_failed, "error_message": sync_log.error_message }