Add new infra architecture
This commit is contained in:
@@ -120,8 +120,12 @@ class InventoryService(StandardFastAPIService):
|
||||
await alert_service.start()
|
||||
self.logger.info("Inventory alert service started")
|
||||
|
||||
# Initialize inventory scheduler with alert service and database manager
|
||||
inventory_scheduler = InventoryScheduler(alert_service, self.database_manager)
|
||||
# Initialize inventory scheduler with alert service, database manager, and Redis URL for leader election
|
||||
inventory_scheduler = InventoryScheduler(
|
||||
alert_service,
|
||||
self.database_manager,
|
||||
redis_url=settings.REDIS_URL # Pass Redis URL for leader election in multi-replica deployments
|
||||
)
|
||||
await inventory_scheduler.start()
|
||||
self.logger.info("Inventory scheduler started")
|
||||
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
Inventory Scheduler Service
|
||||
Background task that periodically checks for inventory alert conditions
|
||||
and triggers appropriate alerts.
|
||||
|
||||
Uses Redis-based leader election to ensure only one pod runs scheduled tasks
|
||||
when running with multiple replicas.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -22,22 +25,129 @@ from app.services.inventory_alert_service import InventoryAlertService
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
class InventoryScheduler:
|
||||
"""Inventory scheduler service that checks for alert conditions"""
|
||||
|
||||
def __init__(self, alert_service: InventoryAlertService, database_manager: Any):
|
||||
class InventoryScheduler:
|
||||
"""
|
||||
Inventory scheduler service that checks for alert conditions.
|
||||
|
||||
Uses Redis-based leader election to ensure only one pod runs
|
||||
scheduled jobs in a multi-replica deployment.
|
||||
"""
|
||||
|
||||
def __init__(self, alert_service: InventoryAlertService, database_manager: Any, redis_url: str = None):
|
||||
self.alert_service = alert_service
|
||||
self.database_manager = database_manager
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
self.scheduler = None
|
||||
self.check_interval = 300 # 5 minutes
|
||||
self.job_id = 'inventory_scheduler'
|
||||
|
||||
# Leader election
|
||||
self._redis_url = redis_url
|
||||
self._leader_election = None
|
||||
self._redis_client = None
|
||||
self._scheduler_started = False
|
||||
|
||||
async def start(self):
|
||||
"""Start the inventory scheduler with APScheduler"""
|
||||
if self.scheduler.running:
|
||||
logger.warning("Inventory scheduler is already running")
|
||||
"""Start the inventory scheduler with leader election"""
|
||||
if self._redis_url:
|
||||
await self._start_with_leader_election()
|
||||
else:
|
||||
# Fallback to standalone mode (for local development or single-pod deployments)
|
||||
logger.warning("Redis URL not provided, starting inventory scheduler in standalone mode")
|
||||
await self._start_standalone()
|
||||
|
||||
async def _start_with_leader_election(self):
|
||||
"""Start with Redis-based leader election for horizontal scaling"""
|
||||
import redis.asyncio as redis
|
||||
from shared.leader_election import LeaderElectionService
|
||||
|
||||
try:
|
||||
# Create Redis connection
|
||||
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
|
||||
await self._redis_client.ping()
|
||||
|
||||
# Create scheduler (but don't start it yet)
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
|
||||
# Create leader election
|
||||
self._leader_election = LeaderElectionService(
|
||||
self._redis_client,
|
||||
service_name="inventory-scheduler"
|
||||
)
|
||||
|
||||
# Start leader election with callbacks
|
||||
await self._leader_election.start(
|
||||
on_become_leader=self._on_become_leader,
|
||||
on_lose_leader=self._on_lose_leader
|
||||
)
|
||||
|
||||
logger.info("Inventory scheduler started with leader election",
|
||||
is_leader=self._leader_election.is_leader,
|
||||
instance_id=self._leader_election.instance_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to start with leader election, falling back to standalone",
|
||||
error=str(e))
|
||||
await self._start_standalone()
|
||||
|
||||
async def _on_become_leader(self):
|
||||
"""Called when this instance becomes the leader"""
|
||||
logger.info("Inventory scheduler became leader, starting scheduled jobs")
|
||||
await self._start_scheduler()
|
||||
|
||||
async def _on_lose_leader(self):
|
||||
"""Called when this instance loses leadership"""
|
||||
logger.warning("Inventory scheduler lost leadership, stopping scheduled jobs")
|
||||
await self._stop_scheduler()
|
||||
|
||||
async def _start_scheduler(self):
|
||||
"""Start the APScheduler with inventory check jobs"""
|
||||
if self._scheduler_started:
|
||||
logger.warning("Inventory scheduler already started")
|
||||
return
|
||||
|
||||
try:
|
||||
# Add the periodic job
|
||||
trigger = IntervalTrigger(seconds=self.check_interval)
|
||||
self.scheduler.add_job(
|
||||
self._run_scheduler_task,
|
||||
trigger=trigger,
|
||||
id=self.job_id,
|
||||
name="Inventory Alert Checks",
|
||||
max_instances=1 # Prevent overlapping executions
|
||||
)
|
||||
|
||||
# Start scheduler
|
||||
if not self.scheduler.running:
|
||||
self.scheduler.start()
|
||||
self._scheduler_started = True
|
||||
logger.info("Inventory scheduler jobs started",
|
||||
interval_seconds=self.check_interval,
|
||||
job_count=len(self.scheduler.get_jobs()))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to start inventory scheduler", error=str(e))
|
||||
|
||||
async def _stop_scheduler(self):
|
||||
"""Stop the APScheduler"""
|
||||
if not self._scheduler_started:
|
||||
return
|
||||
|
||||
try:
|
||||
if self.scheduler and self.scheduler.running:
|
||||
self.scheduler.shutdown(wait=False)
|
||||
self._scheduler_started = False
|
||||
logger.info("Inventory scheduler jobs stopped")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to stop inventory scheduler", error=str(e))
|
||||
|
||||
async def _start_standalone(self):
|
||||
"""Start scheduler without leader election (fallback mode)"""
|
||||
logger.warning("Starting inventory scheduler in standalone mode (no leader election)")
|
||||
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
|
||||
# Add the periodic job
|
||||
trigger = IntervalTrigger(seconds=self.check_interval)
|
||||
self.scheduler.add_job(
|
||||
@@ -45,75 +155,63 @@ class InventoryScheduler:
|
||||
trigger=trigger,
|
||||
id=self.job_id,
|
||||
name="Inventory Alert Checks",
|
||||
max_instances=1 # Prevent overlapping executions
|
||||
max_instances=1
|
||||
)
|
||||
|
||||
# Start the scheduler
|
||||
self.scheduler.start()
|
||||
logger.info("Inventory scheduler started", interval_seconds=self.check_interval)
|
||||
if not self.scheduler.running:
|
||||
self.scheduler.start()
|
||||
self._scheduler_started = True
|
||||
logger.info("Inventory scheduler started (standalone mode)",
|
||||
interval_seconds=self.check_interval)
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the inventory scheduler"""
|
||||
if self.scheduler.running:
|
||||
self.scheduler.shutdown(wait=True)
|
||||
logger.info("Inventory scheduler stopped")
|
||||
else:
|
||||
logger.info("Inventory scheduler already stopped")
|
||||
"""Stop the inventory scheduler and leader election"""
|
||||
# Stop leader election
|
||||
if self._leader_election:
|
||||
await self._leader_election.stop()
|
||||
|
||||
# Stop scheduler
|
||||
await self._stop_scheduler()
|
||||
|
||||
# Close Redis
|
||||
if self._redis_client:
|
||||
await self._redis_client.close()
|
||||
|
||||
logger.info("Inventory scheduler stopped")
|
||||
|
||||
@property
|
||||
def is_leader(self) -> bool:
|
||||
"""Check if this instance is the leader"""
|
||||
return self._leader_election.is_leader if self._leader_election else True
|
||||
|
||||
def get_leader_status(self) -> dict:
|
||||
"""Get leader election status"""
|
||||
if self._leader_election:
|
||||
return self._leader_election.get_status()
|
||||
return {"is_leader": True, "mode": "standalone"}
|
||||
|
||||
async def _run_scheduler_task(self):
|
||||
"""Run scheduled inventory alert checks with leader election"""
|
||||
# Try to acquire leader lock for this scheduler
|
||||
lock_name = f"inventory_scheduler:{self.database_manager.database_url if hasattr(self.database_manager, 'database_url') else 'default'}"
|
||||
lock_id = abs(hash(lock_name)) % (2**31) # Generate a unique integer ID for the lock
|
||||
acquired = False
|
||||
"""Run scheduled inventory alert checks"""
|
||||
start_time = datetime.now()
|
||||
logger.info("Running scheduled inventory alert checks")
|
||||
|
||||
try:
|
||||
# Try to acquire PostgreSQL advisory lock for leader election
|
||||
async with self.database_manager.get_session() as session:
|
||||
result = await session.execute(text("SELECT pg_try_advisory_lock(:lock_id)"), {"lock_id": lock_id})
|
||||
acquired = True # If no exception, lock was acquired
|
||||
# Run all alert checks
|
||||
alerts_generated = await self.check_all_conditions()
|
||||
|
||||
start_time = datetime.now()
|
||||
logger.info("Running scheduled inventory alert checks (as leader)")
|
||||
|
||||
# Run all alert checks
|
||||
alerts_generated = await self.check_all_conditions()
|
||||
|
||||
duration = (datetime.now() - start_time).total_seconds()
|
||||
logger.info(
|
||||
"Completed scheduled inventory alert checks",
|
||||
alerts_generated=alerts_generated,
|
||||
duration_seconds=round(duration, 2)
|
||||
)
|
||||
duration = (datetime.now() - start_time).total_seconds()
|
||||
logger.info(
|
||||
"Completed scheduled inventory alert checks",
|
||||
alerts_generated=alerts_generated,
|
||||
duration_seconds=round(duration, 2)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# If it's a lock acquisition error, log and skip execution (another instance is running)
|
||||
error_str = str(e).lower()
|
||||
if "lock" in error_str or "timeout" in error_str or "could not acquire" in error_str:
|
||||
logger.debug(
|
||||
"Skipping inventory scheduler execution (not leader)",
|
||||
lock_name=lock_name
|
||||
)
|
||||
return # Not an error, just not the leader
|
||||
else:
|
||||
logger.error(
|
||||
"Error in inventory scheduler task",
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
finally:
|
||||
if acquired:
|
||||
# Release the lock
|
||||
try:
|
||||
async with self.database_manager.get_session() as session:
|
||||
await session.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": lock_id})
|
||||
await session.commit()
|
||||
except Exception as unlock_error:
|
||||
logger.warning(
|
||||
"Error releasing leader lock (may have been automatically released)",
|
||||
error=str(unlock_error)
|
||||
)
|
||||
logger.error(
|
||||
"Error in inventory scheduler task",
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
async def check_all_conditions(self) -> int:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user