358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""
|
|
Background Task Scheduler for POS Service
|
|
|
|
Sets up periodic background jobs for:
|
|
- Syncing POS transactions to sales service
|
|
- Other maintenance tasks as needed
|
|
|
|
Uses Redis-based leader election to ensure only one pod runs scheduled tasks
|
|
when running with multiple replicas.
|
|
|
|
Usage in main.py:
|
|
```python
|
|
from app.scheduler import POSScheduler
|
|
|
|
# On startup
|
|
scheduler = POSScheduler(redis_url=settings.REDIS_URL)
|
|
await scheduler.start()
|
|
|
|
# On shutdown
|
|
await scheduler.stop()
|
|
```
|
|
"""
|
|
|
|
import structlog
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class POSScheduler:
|
|
"""
|
|
POS Scheduler service that manages background sync jobs.
|
|
|
|
Uses Redis-based leader election to ensure only one pod runs
|
|
scheduled jobs in a multi-replica deployment.
|
|
"""
|
|
|
|
def __init__(self, redis_url: str = None, sync_interval_minutes: int = 5):
|
|
"""
|
|
Initialize POS scheduler.
|
|
|
|
Args:
|
|
redis_url: Redis connection URL for leader election
|
|
sync_interval_minutes: Interval for POS-to-sales sync job
|
|
"""
|
|
self.scheduler = None
|
|
self.sync_interval_minutes = sync_interval_minutes
|
|
|
|
# Leader election
|
|
self._redis_url = redis_url
|
|
self._leader_election = None
|
|
self._redis_client = None
|
|
self._scheduler_started = False
|
|
|
|
async def start(self):
|
|
"""Start the POS scheduler with leader election"""
|
|
if self._redis_url:
|
|
await self._start_with_leader_election()
|
|
else:
|
|
# Fallback to standalone mode (for local development or single-pod deployments)
|
|
logger.warning("Redis URL not provided, starting POS scheduler in standalone mode")
|
|
await self._start_standalone()
|
|
|
|
async def _start_with_leader_election(self):
|
|
"""Start with Redis-based leader election for horizontal scaling"""
|
|
import redis.asyncio as redis
|
|
from shared.leader_election import LeaderElectionService
|
|
|
|
try:
|
|
# Create Redis connection
|
|
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
|
|
await self._redis_client.ping()
|
|
|
|
# Create scheduler (but don't start it yet)
|
|
self.scheduler = AsyncIOScheduler()
|
|
|
|
# Create leader election
|
|
self._leader_election = LeaderElectionService(
|
|
self._redis_client,
|
|
service_name="pos-scheduler"
|
|
)
|
|
|
|
# Start leader election with callbacks
|
|
await self._leader_election.start(
|
|
on_become_leader=self._on_become_leader,
|
|
on_lose_leader=self._on_lose_leader
|
|
)
|
|
|
|
logger.info("POS scheduler started with leader election",
|
|
is_leader=self._leader_election.is_leader,
|
|
instance_id=self._leader_election.instance_id)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to start with leader election, falling back to standalone",
|
|
error=str(e))
|
|
await self._start_standalone()
|
|
|
|
async def _on_become_leader(self):
|
|
"""Called when this instance becomes the leader"""
|
|
logger.info("POS scheduler became leader, starting scheduled jobs")
|
|
await self._start_scheduler()
|
|
|
|
async def _on_lose_leader(self):
|
|
"""Called when this instance loses leadership"""
|
|
logger.warning("POS scheduler lost leadership, stopping scheduled jobs")
|
|
await self._stop_scheduler()
|
|
|
|
async def _start_scheduler(self):
|
|
"""Start the APScheduler with POS jobs"""
|
|
if self._scheduler_started:
|
|
logger.warning("POS scheduler already started")
|
|
return
|
|
|
|
try:
|
|
# Import sync job
|
|
from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync
|
|
|
|
# Job 1: Sync POS transactions to sales service
|
|
self.scheduler.add_job(
|
|
run_pos_to_sales_sync,
|
|
trigger=IntervalTrigger(minutes=self.sync_interval_minutes),
|
|
id='pos_to_sales_sync',
|
|
name='Sync POS Transactions to Sales',
|
|
replace_existing=True,
|
|
max_instances=1, # Prevent concurrent runs
|
|
coalesce=True, # Combine multiple missed runs into one
|
|
misfire_grace_time=60 # Allow 60 seconds grace for missed runs
|
|
)
|
|
|
|
# Start scheduler
|
|
if not self.scheduler.running:
|
|
self.scheduler.start()
|
|
self._scheduler_started = True
|
|
logger.info("POS scheduler jobs started",
|
|
sync_interval_minutes=self.sync_interval_minutes,
|
|
job_count=len(self.scheduler.get_jobs()),
|
|
next_run=self.scheduler.get_jobs()[0].next_run_time if self.scheduler.get_jobs() else None)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to start POS scheduler", error=str(e))
|
|
|
|
async def _stop_scheduler(self):
|
|
"""Stop the APScheduler"""
|
|
if not self._scheduler_started:
|
|
return
|
|
|
|
try:
|
|
if self.scheduler and self.scheduler.running:
|
|
self.scheduler.shutdown(wait=False)
|
|
self._scheduler_started = False
|
|
logger.info("POS scheduler jobs stopped")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to stop POS scheduler", error=str(e))
|
|
|
|
async def _start_standalone(self):
|
|
"""Start scheduler without leader election (fallback mode)"""
|
|
logger.warning("Starting POS scheduler in standalone mode (no leader election)")
|
|
|
|
self.scheduler = AsyncIOScheduler()
|
|
|
|
try:
|
|
# Import sync job
|
|
from app.jobs.sync_pos_to_sales import run_pos_to_sales_sync
|
|
|
|
self.scheduler.add_job(
|
|
run_pos_to_sales_sync,
|
|
trigger=IntervalTrigger(minutes=self.sync_interval_minutes),
|
|
id='pos_to_sales_sync',
|
|
name='Sync POS Transactions to Sales',
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
misfire_grace_time=60
|
|
)
|
|
|
|
if not self.scheduler.running:
|
|
self.scheduler.start()
|
|
self._scheduler_started = True
|
|
logger.info("POS scheduler started (standalone mode)",
|
|
sync_interval_minutes=self.sync_interval_minutes,
|
|
next_run=self.scheduler.get_jobs()[0].next_run_time if self.scheduler.get_jobs() else None)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to start POS scheduler in standalone mode", error=str(e))
|
|
|
|
async def stop(self):
|
|
"""Stop the POS scheduler and leader election"""
|
|
# Stop leader election
|
|
if self._leader_election:
|
|
await self._leader_election.stop()
|
|
|
|
# Stop scheduler
|
|
await self._stop_scheduler()
|
|
|
|
# Close Redis
|
|
if self._redis_client:
|
|
await self._redis_client.close()
|
|
|
|
logger.info("POS scheduler stopped")
|
|
|
|
@property
|
|
def is_leader(self) -> bool:
|
|
"""Check if this instance is the leader"""
|
|
return self._leader_election.is_leader if self._leader_election else True
|
|
|
|
def get_leader_status(self) -> dict:
|
|
"""Get leader election status"""
|
|
if self._leader_election:
|
|
return self._leader_election.get_status()
|
|
return {"is_leader": True, "mode": "standalone"}
|
|
|
|
def get_scheduler_status(self) -> dict:
|
|
"""
|
|
Get current scheduler status
|
|
|
|
Returns:
|
|
Dict with scheduler info and job statuses
|
|
"""
|
|
if self.scheduler is None or not self._scheduler_started:
|
|
return {
|
|
"running": False,
|
|
"is_leader": self.is_leader,
|
|
"jobs": []
|
|
}
|
|
|
|
jobs = []
|
|
for job in self.scheduler.get_jobs():
|
|
jobs.append({
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
|
|
"trigger": str(job.trigger)
|
|
})
|
|
|
|
return {
|
|
"running": True,
|
|
"is_leader": self.is_leader,
|
|
"jobs": jobs,
|
|
"state": self.scheduler.state
|
|
}
|
|
|
|
def trigger_job_now(self, job_id: str) -> bool:
|
|
"""
|
|
Manually trigger a scheduled job immediately
|
|
|
|
Args:
|
|
job_id: Job identifier (e.g., 'pos_to_sales_sync')
|
|
|
|
Returns:
|
|
True if job was triggered, False otherwise
|
|
"""
|
|
if self.scheduler is None or not self._scheduler_started:
|
|
logger.error("Cannot trigger job, scheduler not running")
|
|
return False
|
|
|
|
if not self.is_leader:
|
|
logger.warning("Cannot trigger job, this instance is not the leader")
|
|
return False
|
|
|
|
try:
|
|
job = self.scheduler.get_job(job_id)
|
|
if job:
|
|
self.scheduler.modify_job(job_id, next_run_time=datetime.now())
|
|
logger.info("Job triggered manually", job_id=job_id)
|
|
return True
|
|
else:
|
|
logger.warning("Job not found", job_id=job_id)
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to trigger job", job_id=job_id, error=str(e))
|
|
return False
|
|
|
|
|
|
# ================================================================
|
|
# Legacy compatibility functions (deprecated - use POSScheduler class)
|
|
# ================================================================
|
|
|
|
# Global scheduler instance for backward compatibility
|
|
_scheduler_instance: Optional[POSScheduler] = None
|
|
|
|
|
|
def start_scheduler():
|
|
"""
|
|
DEPRECATED: Use POSScheduler class directly for better leader election support.
|
|
|
|
Initialize and start the background scheduler (legacy function).
|
|
"""
|
|
global _scheduler_instance
|
|
|
|
if _scheduler_instance is not None:
|
|
logger.warning("Scheduler already running")
|
|
return
|
|
|
|
logger.warning("Using deprecated start_scheduler function. "
|
|
"Consider migrating to POSScheduler class for leader election support.")
|
|
|
|
try:
|
|
_scheduler_instance = POSScheduler()
|
|
# Note: This is synchronous fallback, no leader election
|
|
import asyncio
|
|
asyncio.create_task(_scheduler_instance._start_standalone())
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to start scheduler", error=str(e), exc_info=True)
|
|
_scheduler_instance = None
|
|
|
|
|
|
def shutdown_scheduler():
|
|
"""
|
|
DEPRECATED: Use POSScheduler class directly.
|
|
|
|
Gracefully shutdown the scheduler (legacy function).
|
|
"""
|
|
global _scheduler_instance
|
|
|
|
if _scheduler_instance is None:
|
|
logger.warning("Scheduler not running")
|
|
return
|
|
|
|
try:
|
|
import asyncio
|
|
asyncio.create_task(_scheduler_instance.stop())
|
|
_scheduler_instance = None
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to shutdown scheduler", error=str(e), exc_info=True)
|
|
|
|
|
|
def get_scheduler_status():
|
|
"""
|
|
DEPRECATED: Use POSScheduler class directly.
|
|
|
|
Get current scheduler status (legacy function).
|
|
"""
|
|
if _scheduler_instance is None:
|
|
return {
|
|
"running": False,
|
|
"jobs": []
|
|
}
|
|
return _scheduler_instance.get_scheduler_status()
|
|
|
|
|
|
def trigger_job_now(job_id: str):
|
|
"""
|
|
DEPRECATED: Use POSScheduler class directly.
|
|
|
|
Manually trigger a scheduled job immediately (legacy function).
|
|
"""
|
|
if _scheduler_instance is None:
|
|
logger.error("Cannot trigger job, scheduler not running")
|
|
return False
|
|
return _scheduler_instance.trigger_job_now(job_id)
|