Files
bakery-ia/shared/leader_election/service.py
2026-01-18 09:02:27 +01:00

353 lines
13 KiB
Python

"""
Leader Election Service
Implements Redis-based leader election to ensure only ONE pod runs
singleton tasks like APScheduler jobs.
This is CRITICAL for horizontal scaling - without leader election,
each pod would run the same scheduled jobs, causing:
- Duplicate operations (forecasts, alerts, syncs)
- Database contention
- Inconsistent state
- Duplicate notifications
Implementation:
- Uses Redis SET NX (set if not exists) for atomic leadership acquisition
- Leader maintains leadership with periodic heartbeats
- If leader fails to heartbeat, another pod can take over
- Non-leader pods check periodically if they should become leader
"""
import asyncio
import os
import socket
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
import structlog
logger = structlog.get_logger()
@dataclass
class LeaderElectionConfig:
"""Configuration for leader election"""
# Redis key prefix for the lock
lock_key_prefix: str = "leader"
# Lock expires after this many seconds without refresh
lock_ttl_seconds: int = 30
# Refresh lock every N seconds (should be < lock_ttl_seconds / 2)
heartbeat_interval_seconds: int = 10
# Non-leaders check for leadership every N seconds
election_check_interval_seconds: int = 15
class LeaderElectionService:
"""
Redis-based leader election service.
Ensures only one pod runs scheduled tasks at a time across all replicas.
"""
def __init__(
self,
redis_client,
service_name: str,
config: Optional[LeaderElectionConfig] = None
):
"""
Initialize leader election service.
Args:
redis_client: Async Redis client instance
service_name: Unique name for this service (used in Redis key)
config: Optional configuration override
"""
self.redis = redis_client
self.service_name = service_name
self.config = config or LeaderElectionConfig()
self.lock_key = f"{self.config.lock_key_prefix}:{service_name}:lock"
self.instance_id = self._generate_instance_id()
self.is_leader = False
self._heartbeat_task: Optional[asyncio.Task] = None
self._election_task: Optional[asyncio.Task] = None
self._running = False
self._on_become_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
self._on_lose_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
def _generate_instance_id(self) -> str:
"""Generate unique instance identifier for this pod"""
hostname = os.environ.get('HOSTNAME', socket.gethostname())
pod_ip = os.environ.get('POD_IP', 'unknown')
return f"{hostname}:{pod_ip}:{os.getpid()}"
async def start(
self,
on_become_leader: Optional[Callable[[], Awaitable[None]]] = None,
on_lose_leader: Optional[Callable[[], Awaitable[None]]] = None
):
"""
Start leader election process.
Args:
on_become_leader: Async callback when this instance becomes leader
on_lose_leader: Async callback when this instance loses leadership
"""
self._on_become_leader_callback = on_become_leader
self._on_lose_leader_callback = on_lose_leader
self._running = True
logger.info("Starting leader election",
service=self.service_name,
instance_id=self.instance_id,
lock_key=self.lock_key)
# Try to become leader immediately
await self._try_become_leader()
# Start background tasks
if self.is_leader:
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
else:
self._election_task = asyncio.create_task(self._election_loop())
async def stop(self):
"""Stop leader election and release leadership if held"""
self._running = False
# Cancel background tasks
if self._heartbeat_task:
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
if self._election_task:
self._election_task.cancel()
try:
await self._election_task
except asyncio.CancelledError:
pass
self._election_task = None
# Release leadership
if self.is_leader:
await self._release_leadership()
logger.info("Leader election stopped",
service=self.service_name,
instance_id=self.instance_id,
was_leader=self.is_leader)
async def _try_become_leader(self) -> bool:
"""
Attempt to become the leader.
Returns:
True if this instance is now the leader
"""
try:
# Try to set the lock with NX (only if not exists) and EX (expiry)
acquired = await self.redis.set(
self.lock_key,
self.instance_id,
nx=True, # Only set if not exists
ex=self.config.lock_ttl_seconds
)
if acquired:
self.is_leader = True
logger.info("Became leader",
service=self.service_name,
instance_id=self.instance_id)
# Call callback
if self._on_become_leader_callback:
try:
await self._on_become_leader_callback()
except Exception as e:
logger.error("Error in on_become_leader callback",
service=self.service_name,
error=str(e))
return True
# Check if we're already the leader (reconnection scenario)
current_leader = await self.redis.get(self.lock_key)
if current_leader:
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
if current_leader_str == self.instance_id:
self.is_leader = True
logger.info("Confirmed as existing leader",
service=self.service_name,
instance_id=self.instance_id)
return True
else:
logger.debug("Another instance is leader",
service=self.service_name,
current_leader=current_leader_str,
this_instance=self.instance_id)
return False
except Exception as e:
logger.error("Failed to acquire leadership",
service=self.service_name,
instance_id=self.instance_id,
error=str(e))
return False
async def _release_leadership(self):
"""Release leadership lock"""
try:
# Only delete if we're the current leader
current_leader = await self.redis.get(self.lock_key)
if current_leader:
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
if current_leader_str == self.instance_id:
await self.redis.delete(self.lock_key)
logger.info("Released leadership",
service=self.service_name,
instance_id=self.instance_id)
was_leader = self.is_leader
self.is_leader = False
# Call callback only if we were the leader
if was_leader and self._on_lose_leader_callback:
try:
await self._on_lose_leader_callback()
except Exception as e:
logger.error("Error in on_lose_leader callback",
service=self.service_name,
error=str(e))
except Exception as e:
logger.error("Failed to release leadership",
service=self.service_name,
instance_id=self.instance_id,
error=str(e))
async def _refresh_leadership(self) -> bool:
"""
Refresh leadership lock TTL.
Returns:
True if leadership was maintained
"""
try:
# Verify we're still the leader
current_leader = await self.redis.get(self.lock_key)
if not current_leader:
logger.warning("Lost leadership (lock expired)",
service=self.service_name,
instance_id=self.instance_id)
return False
current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
if current_leader_str != self.instance_id:
logger.warning("Lost leadership (lock held by another instance)",
service=self.service_name,
instance_id=self.instance_id,
current_leader=current_leader_str)
return False
# Refresh the TTL
await self.redis.expire(self.lock_key, self.config.lock_ttl_seconds)
return True
except Exception as e:
logger.error("Failed to refresh leadership",
service=self.service_name,
instance_id=self.instance_id,
error=str(e))
return False
async def _heartbeat_loop(self):
"""Background loop to maintain leadership"""
while self._running and self.is_leader:
try:
await asyncio.sleep(self.config.heartbeat_interval_seconds)
if not self._running:
break
maintained = await self._refresh_leadership()
if not maintained:
self.is_leader = False
# Call callback
if self._on_lose_leader_callback:
try:
await self._on_lose_leader_callback()
except Exception as e:
logger.error("Error in on_lose_leader callback",
service=self.service_name,
error=str(e))
# Switch to election loop
self._election_task = asyncio.create_task(self._election_loop())
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Error in heartbeat loop",
service=self.service_name,
instance_id=self.instance_id,
error=str(e))
async def _election_loop(self):
"""Background loop to attempt leadership acquisition"""
while self._running and not self.is_leader:
try:
await asyncio.sleep(self.config.election_check_interval_seconds)
if not self._running:
break
acquired = await self._try_become_leader()
if acquired:
# Switch to heartbeat loop
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Error in election loop",
service=self.service_name,
instance_id=self.instance_id,
error=str(e))
def get_status(self) -> dict:
"""Get current leader election status"""
return {
"service": self.service_name,
"instance_id": self.instance_id,
"is_leader": self.is_leader,
"running": self._running,
"lock_key": self.lock_key,
"config": {
"lock_ttl_seconds": self.config.lock_ttl_seconds,
"heartbeat_interval_seconds": self.config.heartbeat_interval_seconds,
"election_check_interval_seconds": self.config.election_check_interval_seconds
}
}
async def get_current_leader(self) -> Optional[str]:
"""Get the current leader instance ID (if any)"""
try:
current_leader = await self.redis.get(self.lock_key)
if current_leader:
return current_leader.decode() if isinstance(current_leader, bytes) else current_leader
return None
except Exception as e:
logger.error("Failed to get current leader",
service=self.service_name,
error=str(e))
return None