""" Background Job: Sync POS Transactions to Sales Service This job runs periodically to sync unsynced POS transactions to the sales service, which automatically decreases inventory stock levels. Schedule: Every 5 minutes (configurable) """ import asyncio from datetime import datetime, timedelta from typing import Dict, Any import structlog from app.services.pos_transaction_service import POSTransactionService from app.repositories.pos_config_repository import POSConfigRepository from app.core.database import get_db_transaction logger = structlog.get_logger() class POSToSalesSyncJob: """Background job for syncing POS transactions to sales service""" def __init__(self): self.transaction_service = POSTransactionService() self.batch_size = 50 # Process 50 transactions per batch self.max_retries = 3 # Max retry attempts for failed syncs async def run(self): """ Main job execution method This method: 1. Finds all tenants with active POS configurations 2. For each tenant, syncs unsynced transactions 3. Logs results and errors """ start_time = datetime.utcnow() logger.info("Starting POS to Sales sync job") try: # Get all tenants with active POS configurations tenants_to_sync = await self._get_active_tenants() if not tenants_to_sync: logger.info("No active tenants found for sync") return { "success": True, "tenants_processed": 0, "total_synced": 0, "total_failed": 0 } total_synced = 0 total_failed = 0 results = [] for tenant_id in tenants_to_sync: try: result = await self.transaction_service.sync_unsynced_transactions( tenant_id=tenant_id, limit=self.batch_size ) synced = result.get("synced", 0) failed = result.get("failed", 0) total_synced += synced total_failed += failed results.append({ "tenant_id": str(tenant_id), "synced": synced, "failed": failed }) logger.info("Tenant sync completed", tenant_id=str(tenant_id), synced=synced, failed=failed) except Exception as e: logger.error("Failed to sync tenant", tenant_id=str(tenant_id), error=str(e)) results.append({ "tenant_id": str(tenant_id), "error": str(e) }) duration = (datetime.utcnow() - start_time).total_seconds() logger.info("POS to Sales sync job completed", duration_seconds=duration, tenants_processed=len(tenants_to_sync), total_synced=total_synced, total_failed=total_failed) return { "success": True, "tenants_processed": len(tenants_to_sync), "total_synced": total_synced, "total_failed": total_failed, "duration_seconds": duration, "results": results } except Exception as e: duration = (datetime.utcnow() - start_time).total_seconds() logger.error("POS to Sales sync job failed", error=str(e), duration_seconds=duration, exc_info=True) return { "success": False, "error": str(e), "duration_seconds": duration } async def _get_active_tenants(self): """Get list of tenant IDs with active POS configurations""" try: async with get_db_transaction() as db: repository = POSConfigRepository(db) # Get all active POS configurations configs = await repository.get_all_active_configs() # Extract unique tenant IDs tenant_ids = list(set(config.tenant_id for config in configs)) logger.info("Found tenants with active POS configs", count=len(tenant_ids)) return tenant_ids except Exception as e: logger.error("Failed to get active tenants", error=str(e)) return [] async def sync_specific_tenant(self, tenant_id: str) -> Dict[str, Any]: """ Sync transactions for a specific tenant (for manual triggering) Args: tenant_id: Tenant UUID as string Returns: Sync result dictionary """ try: from uuid import UUID tenant_uuid = UUID(tenant_id) result = await self.transaction_service.sync_unsynced_transactions( tenant_id=tenant_uuid, limit=self.batch_size ) logger.info("Manual tenant sync completed", tenant_id=tenant_id, synced=result.get("synced"), failed=result.get("failed")) return result except Exception as e: logger.error("Failed to sync specific tenant", tenant_id=tenant_id, error=str(e)) return { "success": False, "error": str(e) } # Singleton instance for use in schedulers pos_to_sales_sync_job = POSToSalesSyncJob() async def run_pos_to_sales_sync(): """ Entry point for scheduler Usage with APScheduler: ```python from apscheduler.schedulers.asyncio import AsyncIOScheduler from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync scheduler = AsyncIOScheduler() scheduler.add_job( run_pos_to_sales_sync, 'interval', minutes=5, id='pos_to_sales_sync' ) scheduler.start() ``` Usage with Celery: ```python from celery import Celery from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync @celery.task def sync_pos_transactions(): asyncio.run(run_pos_to_sales_sync()) ``` """ return await pos_to_sales_sync_job.run() if __name__ == "__main__": # For testing: Run sync manually asyncio.run(run_pos_to_sales_sync())