Fix production deadlock
This commit is contained in:
@@ -6,6 +6,7 @@ Supports both alerts and recommendations through unified detection patterns
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import random
|
||||
import uuid
|
||||
from typing import List, Dict, Any, Optional
|
||||
from uuid import UUID
|
||||
@@ -100,38 +101,64 @@ class BaseAlertService:
|
||||
|
||||
while True:
|
||||
try:
|
||||
instance_id = getattr(self.config, 'INSTANCE_ID', 'default')
|
||||
instance_id = getattr(self.config, 'INSTANCE_ID', str(uuid.uuid4()))
|
||||
was_leader = self.is_leader
|
||||
|
||||
# Add jitter to avoid thundering herd when multiple instances start
|
||||
if not was_leader:
|
||||
await asyncio.sleep(random.uniform(0.1, 0.5)) # Small random delay before attempting to acquire
|
||||
|
||||
# Try to acquire new leadership if not currently leader
|
||||
if not self.is_leader:
|
||||
# Use atomic Redis operation to acquire lock
|
||||
result = await self.redis.set(
|
||||
lock_key,
|
||||
instance_id,
|
||||
ex=lock_ttl,
|
||||
nx=True
|
||||
nx=True # Only set if key doesn't exist
|
||||
)
|
||||
self.is_leader = result is not None
|
||||
acquired = result is not None
|
||||
self.is_leader = acquired
|
||||
else:
|
||||
# Already leader - try to extend the lock
|
||||
current_value = await self.redis.get(lock_key)
|
||||
if current_value and current_value.decode() == instance_id:
|
||||
# Still our lock, extend it
|
||||
await self.redis.expire(lock_key, lock_ttl)
|
||||
self.is_leader = True
|
||||
# Still our lock, extend it using a Lua script for atomicity
|
||||
lua_script = """
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("EXPIRE", KEYS[1], ARGV[2])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
"""
|
||||
try:
|
||||
extend_result = await self.redis.eval(
|
||||
lua_script,
|
||||
keys=[lock_key],
|
||||
args=[instance_id, lock_ttl]
|
||||
)
|
||||
self.is_leader = extend_result == 1
|
||||
except:
|
||||
# If Lua script fails (Redis cluster), fall back to simple get/set
|
||||
self.is_leader = True # Keep current state if we can't verify
|
||||
else:
|
||||
# Lock expired or taken by someone else
|
||||
self.is_leader = False
|
||||
|
||||
# Handle leadership changes
|
||||
if self.is_leader and not was_leader:
|
||||
self.scheduler.start()
|
||||
logger.info("Acquired scheduler leadership", service=self.config.SERVICE_NAME)
|
||||
# Add a small delay to allow other instances to detect leadership change
|
||||
await asyncio.sleep(0.1)
|
||||
if self.is_leader: # Double-check we're still the leader
|
||||
self.scheduler.start()
|
||||
logger.info("Acquired scheduler leadership", service=self.config.SERVICE_NAME)
|
||||
elif not self.is_leader and was_leader:
|
||||
self.scheduler.shutdown()
|
||||
if self.scheduler.running:
|
||||
self.scheduler.shutdown()
|
||||
logger.info("Lost scheduler leadership", service=self.config.SERVICE_NAME)
|
||||
|
||||
await asyncio.sleep(lock_ttl // 2)
|
||||
# Add jitter to reduce contention between instances
|
||||
await asyncio.sleep(lock_ttl // 2 + random.uniform(0, 2))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Leadership error", service=self.config.SERVICE_NAME, error=str(e))
|
||||
@@ -189,9 +216,16 @@ class BaseAlertService:
|
||||
"""Maintain database connection for listeners"""
|
||||
try:
|
||||
while not conn.is_closed():
|
||||
await asyncio.sleep(30) # Check every 30 seconds
|
||||
# Use a timeout to avoid hanging indefinitely
|
||||
try:
|
||||
await conn.fetchval("SELECT 1")
|
||||
await asyncio.wait_for(
|
||||
conn.fetchval("SELECT 1"),
|
||||
timeout=5.0
|
||||
)
|
||||
await asyncio.sleep(30) # Check every 30 seconds
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("DB ping timed out, connection may be dead", service=self.config.SERVICE_NAME)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("DB listener connection lost", service=self.config.SERVICE_NAME, error=str(e))
|
||||
break
|
||||
@@ -226,12 +260,22 @@ class BaseAlertService:
|
||||
# Determine routing key based on severity and type
|
||||
routing_key = get_routing_key(item_type, item['severity'], self.config.SERVICE_NAME)
|
||||
|
||||
# Publish to RabbitMQ
|
||||
success = await self.rabbitmq_client.publish_event(
|
||||
exchange_name=self.exchange,
|
||||
routing_key=routing_key,
|
||||
event_data=item
|
||||
)
|
||||
# Publish to RabbitMQ with timeout to prevent blocking
|
||||
try:
|
||||
success = await asyncio.wait_for(
|
||||
self.rabbitmq_client.publish_event(
|
||||
exchange_name=self.exchange,
|
||||
routing_key=routing_key,
|
||||
event_data=item
|
||||
),
|
||||
timeout=10.0 # 10 second timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("RabbitMQ publish timed out",
|
||||
service=self.config.SERVICE_NAME,
|
||||
item_type=item_type,
|
||||
alert_type=item['type'])
|
||||
return False
|
||||
|
||||
if success:
|
||||
self._items_published += 1
|
||||
|
||||
@@ -246,6 +246,33 @@ class DatabaseManager:
|
||||
def __repr__(self) -> str:
|
||||
return f"DatabaseManager(service='{self.service_name}', type='{self._get_database_type()}')"
|
||||
|
||||
async def execute(self, query: str, *args, **kwargs):
|
||||
"""
|
||||
Execute a raw SQL query with proper session management
|
||||
Note: Use this method carefully to avoid transaction conflicts
|
||||
"""
|
||||
from sqlalchemy import text
|
||||
|
||||
# Use a new session context to avoid conflicts with existing sessions
|
||||
async with self.get_session() as session:
|
||||
try:
|
||||
# Convert query to SQLAlchemy text object if it's a string
|
||||
if isinstance(query, str):
|
||||
query = text(query)
|
||||
|
||||
result = await session.execute(query, *args, **kwargs)
|
||||
# For UPDATE/DELETE operations that need to be committed
|
||||
if query.text.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')):
|
||||
await session.commit()
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
# Only rollback if it was a modifying operation
|
||||
if isinstance(query, str) and query.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')):
|
||||
await session.rollback()
|
||||
logger.error("Database execute failed", error=str(e))
|
||||
raise
|
||||
|
||||
|
||||
# ===== CONVENIENCE FUNCTIONS =====
|
||||
|
||||
|
||||
Reference in New Issue
Block a user