217 lines
6.8 KiB
Python
217 lines
6.8 KiB
Python
"""
|
|
Overdue PO Scheduler
|
|
|
|
Background scheduler that periodically checks for overdue purchase orders
|
|
and publishes alerts for them.
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Optional
|
|
from datetime import datetime, timezone
|
|
import structlog
|
|
|
|
from app.services.overdue_po_detector import OverduePODetector
|
|
from shared.messaging import RabbitMQClient
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class OverduePOScheduler:
|
|
"""
|
|
Overdue PO Scheduler
|
|
|
|
Background task that periodically checks for overdue POs
|
|
and publishes alerts.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
rabbitmq_client: Optional[RabbitMQClient] = None,
|
|
check_interval_seconds: int = 3600, # 1 hour default
|
|
):
|
|
"""
|
|
Initialize overdue PO scheduler.
|
|
|
|
Args:
|
|
rabbitmq_client: RabbitMQ client for publishing events
|
|
check_interval_seconds: Seconds between checks (default: 3600 = 1 hour)
|
|
"""
|
|
self.detector = OverduePODetector()
|
|
self.rabbitmq_client = rabbitmq_client
|
|
self.check_interval_seconds = check_interval_seconds
|
|
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
|
|
logger.info(
|
|
"Overdue PO Scheduler initialized",
|
|
check_interval_seconds=check_interval_seconds
|
|
)
|
|
|
|
async def start(self):
|
|
"""Start the scheduler background task"""
|
|
if self._running:
|
|
logger.warning("Overdue PO Scheduler already running")
|
|
return
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._run_scheduler())
|
|
|
|
logger.info("Overdue PO Scheduler started")
|
|
|
|
async def stop(self):
|
|
"""Stop the scheduler background task"""
|
|
if not self._running:
|
|
return
|
|
|
|
self._running = False
|
|
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("Overdue PO Scheduler stopped")
|
|
|
|
async def _run_scheduler(self):
|
|
"""Main scheduler loop"""
|
|
logger.info("Overdue PO Scheduler loop started")
|
|
|
|
while self._running:
|
|
try:
|
|
await self._process_cycle()
|
|
except Exception as e:
|
|
logger.error(
|
|
"Overdue PO scheduler cycle failed",
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
|
|
# Wait for next cycle
|
|
try:
|
|
await asyncio.sleep(self.check_interval_seconds)
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
logger.info("Overdue PO Scheduler loop ended")
|
|
|
|
async def _process_cycle(self):
|
|
"""Process one scheduler cycle - detect and alert on overdue POs"""
|
|
logger.info("Starting overdue PO detection cycle")
|
|
|
|
try:
|
|
# Detect all overdue POs across all tenants
|
|
overdue_pos = await self.detector.detect_overdue_pos()
|
|
|
|
if not overdue_pos:
|
|
logger.info("No overdue POs detected in this cycle")
|
|
return
|
|
|
|
# Group by severity
|
|
by_severity = {
|
|
'critical': [],
|
|
'high': [],
|
|
'medium': [],
|
|
'low': []
|
|
}
|
|
|
|
for po in overdue_pos:
|
|
severity = po.get('severity', 'medium')
|
|
by_severity[severity].append(po)
|
|
|
|
# Log summary
|
|
logger.warning(
|
|
"Overdue POs detected",
|
|
total=len(overdue_pos),
|
|
critical=len(by_severity['critical']),
|
|
high=len(by_severity['high']),
|
|
medium=len(by_severity['medium']),
|
|
low=len(by_severity['low'])
|
|
)
|
|
|
|
# Publish events for critical and high severity
|
|
if self.rabbitmq_client and self.rabbitmq_client.connected:
|
|
critical_and_high = by_severity['critical'] + by_severity['high']
|
|
|
|
for po in critical_and_high:
|
|
await self._publish_overdue_alert(po)
|
|
|
|
logger.info(
|
|
"Published overdue alerts",
|
|
count=len(critical_and_high)
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"RabbitMQ not available, skipping alert publishing",
|
|
overdue_count=len(by_severity['critical'] + by_severity['high'])
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error in overdue PO detection cycle",
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
|
|
async def _publish_overdue_alert(self, po_summary: dict):
|
|
"""
|
|
Publish an overdue PO alert event.
|
|
|
|
Args:
|
|
po_summary: Overdue PO summary from detector
|
|
"""
|
|
try:
|
|
event_data = {
|
|
'po_id': po_summary['po_id'],
|
|
'tenant_id': po_summary['tenant_id'],
|
|
'po_number': po_summary['po_number'],
|
|
'supplier_id': po_summary['supplier_id'],
|
|
'status': po_summary['status'],
|
|
'total_amount': po_summary['total_amount'],
|
|
'currency': po_summary['currency'],
|
|
'estimated_delivery_date': po_summary['estimated_delivery_date'],
|
|
'days_overdue': po_summary['days_overdue'],
|
|
'severity': po_summary['severity'],
|
|
'priority': po_summary['priority'],
|
|
'detected_at': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Create event data structure
|
|
event_data_full = {
|
|
'service_name': 'procurement',
|
|
'event_type': 'po.overdue_detected',
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
**event_data # Include the original event_data
|
|
}
|
|
|
|
# Publish to RabbitMQ
|
|
success = await self.rabbitmq_client.publish_event(
|
|
exchange_name='procurement.events',
|
|
routing_key='po.overdue',
|
|
event_data=event_data_full,
|
|
persistent=True
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Published overdue alert",
|
|
po_number=po_summary['po_number'],
|
|
days_overdue=po_summary['days_overdue'],
|
|
severity=po_summary['severity']
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Failed to publish overdue alert",
|
|
po_number=po_summary['po_number']
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error publishing overdue alert",
|
|
error=str(e),
|
|
po_number=po_summary.get('po_number'),
|
|
exc_info=True
|
|
)
|