Add whatsapp feature
This commit is contained in:
@@ -118,14 +118,17 @@ class BaseAlertService:
|
||||
"""Leader election for scheduled jobs"""
|
||||
lock_key = f"scheduler_lock:{self.config.SERVICE_NAME}"
|
||||
lock_ttl = 60
|
||||
# Generate instance_id once for the lifetime of this leadership loop
|
||||
# IMPORTANT: Don't regenerate on each iteration or lock extension will always fail!
|
||||
instance_id = getattr(self.config, 'INSTANCE_ID', str(uuid.uuid4()))
|
||||
|
||||
logger.info("DEBUG: maintain_leadership starting",
|
||||
service=self.config.SERVICE_NAME,
|
||||
instance_id=instance_id,
|
||||
redis_client_type=str(type(self.redis)))
|
||||
|
||||
while True:
|
||||
try:
|
||||
instance_id = getattr(self.config, 'INSTANCE_ID', str(uuid.uuid4()))
|
||||
was_leader = self.is_leader
|
||||
|
||||
# Add jitter to avoid thundering herd when multiple instances start
|
||||
@@ -144,31 +147,37 @@ class BaseAlertService:
|
||||
acquired = result is not None
|
||||
self.is_leader = acquired
|
||||
else:
|
||||
# Already leader - try to extend the lock
|
||||
current_value = await self.redis.get(lock_key)
|
||||
# Note: decode_responses=True means Redis returns strings, not bytes
|
||||
if current_value and current_value == instance_id:
|
||||
# Still our lock, extend it using a Lua script for atomicity
|
||||
lua_script = """
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("EXPIRE", KEYS[1], ARGV[2])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
"""
|
||||
try:
|
||||
extend_result = await self.redis.eval(
|
||||
lua_script,
|
||||
keys=[lock_key],
|
||||
args=[instance_id, lock_ttl]
|
||||
)
|
||||
self.is_leader = extend_result == 1
|
||||
except:
|
||||
# If Lua script fails (Redis cluster), fall back to simple get/set
|
||||
self.is_leader = True # Keep current state if we can't verify
|
||||
else:
|
||||
# Lock expired or taken by someone else
|
||||
self.is_leader = False
|
||||
# Already leader - try to extend the lock atomically
|
||||
# Use SET with EX and GET to atomically refresh the lock
|
||||
try:
|
||||
# SET key value EX ttl GET returns the old value (atomic check-and-set)
|
||||
# This is atomic and works in both standalone and cluster mode
|
||||
old_value = await self.redis.set(
|
||||
lock_key,
|
||||
instance_id,
|
||||
ex=lock_ttl,
|
||||
get=True # Return old value (Python redis uses 'get' param for GET option)
|
||||
)
|
||||
# If old value matches our instance_id, we successfully extended
|
||||
self.is_leader = old_value == instance_id
|
||||
if self.is_leader:
|
||||
logger.debug("Lock extended successfully",
|
||||
service=self.config.SERVICE_NAME,
|
||||
instance_id=instance_id,
|
||||
ttl=lock_ttl)
|
||||
else:
|
||||
# Lock was taken by someone else or expired
|
||||
logger.info("Lost lock ownership during extension",
|
||||
service=self.config.SERVICE_NAME,
|
||||
old_owner=old_value,
|
||||
instance_id=instance_id)
|
||||
except Exception as e:
|
||||
# If extend fails, try to verify we still have the lock
|
||||
logger.warning("Failed to extend lock, verifying ownership",
|
||||
service=self.config.SERVICE_NAME,
|
||||
error=str(e))
|
||||
current_check = await self.redis.get(lock_key)
|
||||
self.is_leader = current_check == instance_id
|
||||
|
||||
# Handle leadership changes
|
||||
if self.is_leader and not was_leader:
|
||||
|
||||
Reference in New Issue
Block a user