218 lines
6.6 KiB
Python
218 lines
6.6 KiB
Python
"""
|
|
Background Job: Sync POS Transactions to Sales Service
|
|
|
|
This job runs periodically to sync unsynced POS transactions to the sales service,
|
|
which automatically decreases inventory stock levels.
|
|
|
|
Schedule: Every 5 minutes (configurable)
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any
|
|
import structlog
|
|
|
|
from app.services.pos_transaction_service import POSTransactionService
|
|
from app.repositories.pos_config_repository import POSConfigRepository
|
|
from app.core.database import get_db_transaction
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class POSToSalesSyncJob:
|
|
"""Background job for syncing POS transactions to sales service"""
|
|
|
|
def __init__(self):
|
|
self.transaction_service = POSTransactionService()
|
|
self.batch_size = 50 # Process 50 transactions per batch
|
|
self.max_retries = 3 # Max retry attempts for failed syncs
|
|
|
|
async def run(self):
|
|
"""
|
|
Main job execution method
|
|
|
|
This method:
|
|
1. Finds all tenants with active POS configurations
|
|
2. For each tenant, syncs unsynced transactions
|
|
3. Logs results and errors
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
logger.info("Starting POS to Sales sync job")
|
|
|
|
try:
|
|
# Get all tenants with active POS configurations
|
|
tenants_to_sync = await self._get_active_tenants()
|
|
|
|
if not tenants_to_sync:
|
|
logger.info("No active tenants found for sync")
|
|
return {
|
|
"success": True,
|
|
"tenants_processed": 0,
|
|
"total_synced": 0,
|
|
"total_failed": 0
|
|
}
|
|
|
|
total_synced = 0
|
|
total_failed = 0
|
|
results = []
|
|
|
|
for tenant_id in tenants_to_sync:
|
|
try:
|
|
result = await self.transaction_service.sync_unsynced_transactions(
|
|
tenant_id=tenant_id,
|
|
limit=self.batch_size
|
|
)
|
|
|
|
synced = result.get("synced", 0)
|
|
failed = result.get("failed", 0)
|
|
|
|
total_synced += synced
|
|
total_failed += failed
|
|
|
|
results.append({
|
|
"tenant_id": str(tenant_id),
|
|
"synced": synced,
|
|
"failed": failed
|
|
})
|
|
|
|
logger.info("Tenant sync completed",
|
|
tenant_id=str(tenant_id),
|
|
synced=synced,
|
|
failed=failed)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to sync tenant",
|
|
tenant_id=str(tenant_id),
|
|
error=str(e))
|
|
results.append({
|
|
"tenant_id": str(tenant_id),
|
|
"error": str(e)
|
|
})
|
|
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
logger.info("POS to Sales sync job completed",
|
|
duration_seconds=duration,
|
|
tenants_processed=len(tenants_to_sync),
|
|
total_synced=total_synced,
|
|
total_failed=total_failed)
|
|
|
|
return {
|
|
"success": True,
|
|
"tenants_processed": len(tenants_to_sync),
|
|
"total_synced": total_synced,
|
|
"total_failed": total_failed,
|
|
"duration_seconds": duration,
|
|
"results": results
|
|
}
|
|
|
|
except Exception as e:
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
logger.error("POS to Sales sync job failed",
|
|
error=str(e),
|
|
duration_seconds=duration,
|
|
exc_info=True)
|
|
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"duration_seconds": duration
|
|
}
|
|
|
|
async def _get_active_tenants(self):
|
|
"""Get list of tenant IDs with active POS configurations"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = POSConfigRepository(db)
|
|
|
|
# Get all active POS configurations
|
|
configs = await repository.get_all_active_configs()
|
|
|
|
# Extract unique tenant IDs
|
|
tenant_ids = list(set(config.tenant_id for config in configs))
|
|
|
|
logger.info("Found tenants with active POS configs",
|
|
count=len(tenant_ids))
|
|
|
|
return tenant_ids
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get active tenants", error=str(e))
|
|
return []
|
|
|
|
async def sync_specific_tenant(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Sync transactions for a specific tenant (for manual triggering)
|
|
|
|
Args:
|
|
tenant_id: Tenant UUID as string
|
|
|
|
Returns:
|
|
Sync result dictionary
|
|
"""
|
|
try:
|
|
from uuid import UUID
|
|
tenant_uuid = UUID(tenant_id)
|
|
|
|
result = await self.transaction_service.sync_unsynced_transactions(
|
|
tenant_id=tenant_uuid,
|
|
limit=self.batch_size
|
|
)
|
|
|
|
logger.info("Manual tenant sync completed",
|
|
tenant_id=tenant_id,
|
|
synced=result.get("synced"),
|
|
failed=result.get("failed"))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to sync specific tenant",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
# Singleton instance for use in schedulers
|
|
pos_to_sales_sync_job = POSToSalesSyncJob()
|
|
|
|
|
|
async def run_pos_to_sales_sync():
|
|
"""
|
|
Entry point for scheduler
|
|
|
|
Usage with APScheduler:
|
|
```python
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
scheduler.add_job(
|
|
run_pos_to_sales_sync,
|
|
'interval',
|
|
minutes=5,
|
|
id='pos_to_sales_sync'
|
|
)
|
|
scheduler.start()
|
|
```
|
|
|
|
Usage with Celery:
|
|
```python
|
|
from celery import Celery
|
|
from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync
|
|
|
|
@celery.task
|
|
def sync_pos_transactions():
|
|
asyncio.run(run_pos_to_sales_sync())
|
|
```
|
|
"""
|
|
return await pos_to_sales_sync_job.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# For testing: Run sync manually
|
|
asyncio.run(run_pos_to_sales_sync())
|