Files

210 lines
6.8 KiB
Python
Raw Permalink Normal View History

2026-01-18 09:02:27 +01:00
"""
Scheduler Leader Mixin
Provides a mixin class for services that use APScheduler and need
leader election for horizontal scaling.
Usage:
class MySchedulerService(SchedulerLeaderMixin):
def __init__(self, redis_url: str, service_name: str):
super().__init__(redis_url, service_name)
# Your initialization here
async def _create_scheduler_jobs(self):
'''Override to define your scheduled jobs'''
self.scheduler.add_job(
self.my_job,
trigger=CronTrigger(hour=0),
id='my_job'
)
async def my_job(self):
# Your job logic here
pass
"""
import asyncio
from typing import Optional
from abc import abstractmethod
import structlog
logger = structlog.get_logger()
class SchedulerLeaderMixin:
"""
Mixin for services that use APScheduler with leader election.
Provides automatic leader election and scheduler management.
Only the leader pod will run scheduled jobs.
"""
def __init__(self, redis_url: str, service_name: str, **kwargs):
"""
Initialize the scheduler with leader election.
Args:
redis_url: Redis connection URL for leader election
service_name: Unique service name for leader election lock
**kwargs: Additional arguments passed to parent class
"""
super().__init__(**kwargs)
self._redis_url = redis_url
self._service_name = service_name
self._leader_election = None
self._redis_client = None
self.scheduler = None
self._scheduler_started = False
async def start_with_leader_election(self):
"""
Start the service with leader election.
Only the leader will start the scheduler.
"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from shared.leader_election.service import LeaderElectionService
import redis.asyncio as redis
try:
# Create Redis connection
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
await self._redis_client.ping()
# Create scheduler (but don't start it yet)
self.scheduler = AsyncIOScheduler()
# Create leader election
self._leader_election = LeaderElectionService(
self._redis_client,
self._service_name
)
# Start leader election with callbacks
await self._leader_election.start(
on_become_leader=self._on_become_leader,
on_lose_leader=self._on_lose_leader
)
logger.info("Scheduler service started with leader election",
service=self._service_name,
is_leader=self._leader_election.is_leader,
instance_id=self._leader_election.instance_id)
except Exception as e:
logger.error("Failed to start with leader election, falling back to standalone",
service=self._service_name,
error=str(e))
# Fallback: start scheduler anyway (for single-pod deployments)
await self._start_scheduler_standalone()
async def _on_become_leader(self):
"""Called when this instance becomes the leader"""
logger.info("Became leader, starting scheduler",
service=self._service_name)
await self._start_scheduler()
async def _on_lose_leader(self):
"""Called when this instance loses leadership"""
logger.warning("Lost leadership, stopping scheduler",
service=self._service_name)
await self._stop_scheduler()
async def _start_scheduler(self):
"""Start the scheduler with defined jobs"""
if self._scheduler_started:
logger.warning("Scheduler already started",
service=self._service_name)
return
try:
# Let subclass define jobs
await self._create_scheduler_jobs()
# Start scheduler
if not self.scheduler.running:
self.scheduler.start()
self._scheduler_started = True
logger.info("Scheduler started",
service=self._service_name,
job_count=len(self.scheduler.get_jobs()))
except Exception as e:
logger.error("Failed to start scheduler",
service=self._service_name,
error=str(e))
async def _stop_scheduler(self):
"""Stop the scheduler"""
if not self._scheduler_started:
return
try:
if self.scheduler and self.scheduler.running:
self.scheduler.shutdown(wait=False)
self._scheduler_started = False
logger.info("Scheduler stopped",
service=self._service_name)
except Exception as e:
logger.error("Failed to stop scheduler",
service=self._service_name,
error=str(e))
async def _start_scheduler_standalone(self):
"""Start scheduler without leader election (fallback mode)"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger.warning("Starting scheduler in standalone mode (no leader election)",
service=self._service_name)
self.scheduler = AsyncIOScheduler()
await self._create_scheduler_jobs()
if not self.scheduler.running:
self.scheduler.start()
self._scheduler_started = True
@abstractmethod
async def _create_scheduler_jobs(self):
"""
Override to define scheduled jobs.
Example:
self.scheduler.add_job(
self.my_task,
trigger=CronTrigger(hour=0, minute=30),
id='my_task',
max_instances=1
)
"""
pass
async def stop(self):
"""Stop the scheduler and leader election"""
# Stop leader election
if self._leader_election:
await self._leader_election.stop()
# Stop scheduler
await self._stop_scheduler()
# Close Redis
if self._redis_client:
await self._redis_client.close()
logger.info("Scheduler service stopped",
service=self._service_name)
@property
def is_leader(self) -> bool:
"""Check if this instance is the leader"""
return self._leader_election.is_leader if self._leader_election else False
def get_leader_status(self) -> dict:
"""Get leader election status"""
if self._leader_election:
return self._leader_election.get_status()
return {"is_leader": True, "mode": "standalone"}