353 lines
13 KiB
Python
353 lines
13 KiB
Python
"""
|
|
Leader Election Service
|
|
|
|
Implements Redis-based leader election to ensure only ONE pod runs
|
|
singleton tasks like APScheduler jobs.
|
|
|
|
This is CRITICAL for horizontal scaling - without leader election,
|
|
each pod would run the same scheduled jobs, causing:
|
|
- Duplicate operations (forecasts, alerts, syncs)
|
|
- Database contention
|
|
- Inconsistent state
|
|
- Duplicate notifications
|
|
|
|
Implementation:
|
|
- Uses Redis SET NX (set if not exists) for atomic leadership acquisition
|
|
- Leader maintains leadership with periodic heartbeats
|
|
- If leader fails to heartbeat, another pod can take over
|
|
- Non-leader pods check periodically if they should become leader
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import socket
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Callable, Awaitable
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
@dataclass
|
|
class LeaderElectionConfig:
|
|
"""Configuration for leader election"""
|
|
# Redis key prefix for the lock
|
|
lock_key_prefix: str = "leader"
|
|
# Lock expires after this many seconds without refresh
|
|
lock_ttl_seconds: int = 30
|
|
# Refresh lock every N seconds (should be < lock_ttl_seconds / 2)
|
|
heartbeat_interval_seconds: int = 10
|
|
# Non-leaders check for leadership every N seconds
|
|
election_check_interval_seconds: int = 15
|
|
|
|
|
|
class LeaderElectionService:
|
|
"""
|
|
Redis-based leader election service.
|
|
|
|
Ensures only one pod runs scheduled tasks at a time across all replicas.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
redis_client,
|
|
service_name: str,
|
|
config: Optional[LeaderElectionConfig] = None
|
|
):
|
|
"""
|
|
Initialize leader election service.
|
|
|
|
Args:
|
|
redis_client: Async Redis client instance
|
|
service_name: Unique name for this service (used in Redis key)
|
|
config: Optional configuration override
|
|
"""
|
|
self.redis = redis_client
|
|
self.service_name = service_name
|
|
self.config = config or LeaderElectionConfig()
|
|
self.lock_key = f"{self.config.lock_key_prefix}:{service_name}:lock"
|
|
self.instance_id = self._generate_instance_id()
|
|
self.is_leader = False
|
|
self._heartbeat_task: Optional[asyncio.Task] = None
|
|
self._election_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
self._on_become_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
|
|
self._on_lose_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
|
|
|
|
def _generate_instance_id(self) -> str:
|
|
"""Generate unique instance identifier for this pod"""
|
|
hostname = os.environ.get('HOSTNAME', socket.gethostname())
|
|
pod_ip = os.environ.get('POD_IP', 'unknown')
|
|
return f"{hostname}:{pod_ip}:{os.getpid()}"
|
|
|
|
async def start(
|
|
self,
|
|
on_become_leader: Optional[Callable[[], Awaitable[None]]] = None,
|
|
on_lose_leader: Optional[Callable[[], Awaitable[None]]] = None
|
|
):
|
|
"""
|
|
Start leader election process.
|
|
|
|
Args:
|
|
on_become_leader: Async callback when this instance becomes leader
|
|
on_lose_leader: Async callback when this instance loses leadership
|
|
"""
|
|
self._on_become_leader_callback = on_become_leader
|
|
self._on_lose_leader_callback = on_lose_leader
|
|
self._running = True
|
|
|
|
logger.info("Starting leader election",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
lock_key=self.lock_key)
|
|
|
|
# Try to become leader immediately
|
|
await self._try_become_leader()
|
|
|
|
# Start background tasks
|
|
if self.is_leader:
|
|
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
|
|
else:
|
|
self._election_task = asyncio.create_task(self._election_loop())
|
|
|
|
async def stop(self):
|
|
"""Stop leader election and release leadership if held"""
|
|
self._running = False
|
|
|
|
# Cancel background tasks
|
|
if self._heartbeat_task:
|
|
self._heartbeat_task.cancel()
|
|
try:
|
|
await self._heartbeat_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._heartbeat_task = None
|
|
|
|
if self._election_task:
|
|
self._election_task.cancel()
|
|
try:
|
|
await self._election_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._election_task = None
|
|
|
|
# Release leadership
|
|
if self.is_leader:
|
|
await self._release_leadership()
|
|
|
|
logger.info("Leader election stopped",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
was_leader=self.is_leader)
|
|
|
|
async def _try_become_leader(self) -> bool:
|
|
"""
|
|
Attempt to become the leader.
|
|
|
|
Returns:
|
|
True if this instance is now the leader
|
|
"""
|
|
try:
|
|
# Try to set the lock with NX (only if not exists) and EX (expiry)
|
|
acquired = await self.redis.set(
|
|
self.lock_key,
|
|
self.instance_id,
|
|
nx=True, # Only set if not exists
|
|
ex=self.config.lock_ttl_seconds
|
|
)
|
|
|
|
if acquired:
|
|
self.is_leader = True
|
|
logger.info("Became leader",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id)
|
|
|
|
# Call callback
|
|
if self._on_become_leader_callback:
|
|
try:
|
|
await self._on_become_leader_callback()
|
|
except Exception as e:
|
|
logger.error("Error in on_become_leader callback",
|
|
service=self.service_name,
|
|
error=str(e))
|
|
|
|
return True
|
|
|
|
# Check if we're already the leader (reconnection scenario)
|
|
current_leader = await self.redis.get(self.lock_key)
|
|
if current_leader:
|
|
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
|
|
if current_leader_str == self.instance_id:
|
|
self.is_leader = True
|
|
logger.info("Confirmed as existing leader",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id)
|
|
return True
|
|
else:
|
|
logger.debug("Another instance is leader",
|
|
service=self.service_name,
|
|
current_leader=current_leader_str,
|
|
this_instance=self.instance_id)
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to acquire leadership",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
error=str(e))
|
|
return False
|
|
|
|
async def _release_leadership(self):
|
|
"""Release leadership lock"""
|
|
try:
|
|
# Only delete if we're the current leader
|
|
current_leader = await self.redis.get(self.lock_key)
|
|
if current_leader:
|
|
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
|
|
if current_leader_str == self.instance_id:
|
|
await self.redis.delete(self.lock_key)
|
|
logger.info("Released leadership",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id)
|
|
|
|
was_leader = self.is_leader
|
|
self.is_leader = False
|
|
|
|
# Call callback only if we were the leader
|
|
if was_leader and self._on_lose_leader_callback:
|
|
try:
|
|
await self._on_lose_leader_callback()
|
|
except Exception as e:
|
|
logger.error("Error in on_lose_leader callback",
|
|
service=self.service_name,
|
|
error=str(e))
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to release leadership",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
error=str(e))
|
|
|
|
async def _refresh_leadership(self) -> bool:
|
|
"""
|
|
Refresh leadership lock TTL.
|
|
|
|
Returns:
|
|
True if leadership was maintained
|
|
"""
|
|
try:
|
|
# Verify we're still the leader
|
|
current_leader = await self.redis.get(self.lock_key)
|
|
if not current_leader:
|
|
logger.warning("Lost leadership (lock expired)",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id)
|
|
return False
|
|
|
|
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
|
|
if current_leader_str != self.instance_id:
|
|
logger.warning("Lost leadership (lock held by another instance)",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
current_leader=current_leader_str)
|
|
return False
|
|
|
|
# Refresh the TTL
|
|
await self.redis.expire(self.lock_key, self.config.lock_ttl_seconds)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to refresh leadership",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
error=str(e))
|
|
return False
|
|
|
|
async def _heartbeat_loop(self):
|
|
"""Background loop to maintain leadership"""
|
|
while self._running and self.is_leader:
|
|
try:
|
|
await asyncio.sleep(self.config.heartbeat_interval_seconds)
|
|
|
|
if not self._running:
|
|
break
|
|
|
|
maintained = await self._refresh_leadership()
|
|
|
|
if not maintained:
|
|
self.is_leader = False
|
|
|
|
# Call callback
|
|
if self._on_lose_leader_callback:
|
|
try:
|
|
await self._on_lose_leader_callback()
|
|
except Exception as e:
|
|
logger.error("Error in on_lose_leader callback",
|
|
service=self.service_name,
|
|
error=str(e))
|
|
|
|
# Switch to election loop
|
|
self._election_task = asyncio.create_task(self._election_loop())
|
|
break
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error("Error in heartbeat loop",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
error=str(e))
|
|
|
|
async def _election_loop(self):
|
|
"""Background loop to attempt leadership acquisition"""
|
|
while self._running and not self.is_leader:
|
|
try:
|
|
await asyncio.sleep(self.config.election_check_interval_seconds)
|
|
|
|
if not self._running:
|
|
break
|
|
|
|
acquired = await self._try_become_leader()
|
|
|
|
if acquired:
|
|
# Switch to heartbeat loop
|
|
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
|
|
break
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error("Error in election loop",
|
|
service=self.service_name,
|
|
instance_id=self.instance_id,
|
|
error=str(e))
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current leader election status"""
|
|
return {
|
|
"service": self.service_name,
|
|
"instance_id": self.instance_id,
|
|
"is_leader": self.is_leader,
|
|
"running": self._running,
|
|
"lock_key": self.lock_key,
|
|
"config": {
|
|
"lock_ttl_seconds": self.config.lock_ttl_seconds,
|
|
"heartbeat_interval_seconds": self.config.heartbeat_interval_seconds,
|
|
"election_check_interval_seconds": self.config.election_check_interval_seconds
|
|
}
|
|
}
|
|
|
|
async def get_current_leader(self) -> Optional[str]:
|
|
"""Get the current leader instance ID (if any)"""
|
|
try:
|
|
current_leader = await self.redis.get(self.lock_key)
|
|
if current_leader:
|
|
return current_leader.decode() if isinstance(current_leader, bytes) else current_leader
|
|
return None
|
|
except Exception as e:
|
|
logger.error("Failed to get current leader",
|
|
service=self.service_name,
|
|
error=str(e))
|
|
return None
|