Files
bakery-ia/services/inventory/app/services/inventory_scheduler.py
2026-01-19 11:55:17 +01:00

1193 lines
48 KiB
Python

"""
Inventory Scheduler Service
Background task that periodically checks for inventory alert conditions
and triggers appropriate alerts.
Uses Redis-based leader election to ensure only one pod runs scheduled tasks
when running with multiple replicas.
"""
import asyncio
from typing import Dict, Any, List, Optional
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from app.repositories.ingredient_repository import IngredientRepository
from app.repositories.stock_repository import StockRepository
from app.repositories.food_safety_repository import FoodSafetyRepository
from app.services.inventory_alert_service import InventoryAlertService
logger = structlog.get_logger()
class InventoryScheduler:
"""
Inventory scheduler service that checks for alert conditions.
Uses Redis-based leader election to ensure only one pod runs
scheduled jobs in a multi-replica deployment.
"""
def __init__(self, alert_service: InventoryAlertService, database_manager: Any, redis_url: str = None):
self.alert_service = alert_service
self.database_manager = database_manager
self.scheduler = None
self.check_interval = 300 # 5 minutes
self.job_id = 'inventory_scheduler'
# Leader election
self._redis_url = redis_url
self._leader_election = None
self._redis_client = None
self._scheduler_started = False
async def start(self):
"""Start the inventory scheduler with leader election"""
if self._redis_url:
await self._start_with_leader_election()
else:
# Fallback to standalone mode (for local development or single-pod deployments)
logger.warning("Redis URL not provided, starting inventory scheduler in standalone mode")
await self._start_standalone()
async def _start_with_leader_election(self):
"""Start with Redis-based leader election for horizontal scaling"""
import redis.asyncio as redis
from shared.leader_election import LeaderElectionService
try:
# Create Redis connection
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
await self._redis_client.ping()
# Create scheduler (but don't start it yet)
self.scheduler = AsyncIOScheduler()
# Create leader election
self._leader_election = LeaderElectionService(
self._redis_client,
service_name="inventory-scheduler"
)
# Start leader election with callbacks
await self._leader_election.start(
on_become_leader=self._on_become_leader,
on_lose_leader=self._on_lose_leader
)
logger.info("Inventory scheduler started with leader election",
is_leader=self._leader_election.is_leader,
instance_id=self._leader_election.instance_id)
except Exception as e:
logger.error("Failed to start with leader election, falling back to standalone",
error=str(e))
await self._start_standalone()
async def _on_become_leader(self):
"""Called when this instance becomes the leader"""
logger.info("Inventory scheduler became leader, starting scheduled jobs")
await self._start_scheduler()
async def _on_lose_leader(self):
"""Called when this instance loses leadership"""
logger.warning("Inventory scheduler lost leadership, stopping scheduled jobs")
await self._stop_scheduler()
async def _start_scheduler(self):
"""Start the APScheduler with inventory check jobs"""
if self._scheduler_started:
logger.warning("Inventory scheduler already started")
return
try:
# Add the periodic job
trigger = IntervalTrigger(seconds=self.check_interval)
self.scheduler.add_job(
self._run_scheduler_task,
trigger=trigger,
id=self.job_id,
name="Inventory Alert Checks",
max_instances=1 # Prevent overlapping executions
)
# Start scheduler
if not self.scheduler.running:
self.scheduler.start()
self._scheduler_started = True
logger.info("Inventory scheduler jobs started",
interval_seconds=self.check_interval,
job_count=len(self.scheduler.get_jobs()))
except Exception as e:
logger.error("Failed to start inventory scheduler", error=str(e))
async def _stop_scheduler(self):
"""Stop the APScheduler"""
if not self._scheduler_started:
return
try:
if self.scheduler and self.scheduler.running:
self.scheduler.shutdown(wait=False)
self._scheduler_started = False
logger.info("Inventory scheduler jobs stopped")
except Exception as e:
logger.error("Failed to stop inventory scheduler", error=str(e))
async def _start_standalone(self):
"""Start scheduler without leader election (fallback mode)"""
logger.warning("Starting inventory scheduler in standalone mode (no leader election)")
self.scheduler = AsyncIOScheduler()
# Add the periodic job
trigger = IntervalTrigger(seconds=self.check_interval)
self.scheduler.add_job(
self._run_scheduler_task,
trigger=trigger,
id=self.job_id,
name="Inventory Alert Checks",
max_instances=1
)
if not self.scheduler.running:
self.scheduler.start()
self._scheduler_started = True
logger.info("Inventory scheduler started (standalone mode)",
interval_seconds=self.check_interval)
async def stop(self):
"""Stop the inventory scheduler and leader election"""
# Stop leader election
if self._leader_election:
await self._leader_election.stop()
# Stop scheduler
await self._stop_scheduler()
# Close Redis
if self._redis_client:
await self._redis_client.close()
logger.info("Inventory scheduler stopped")
@property
def is_leader(self) -> bool:
"""Check if this instance is the leader"""
return self._leader_election.is_leader if self._leader_election else True
def get_leader_status(self) -> dict:
"""Get leader election status"""
if self._leader_election:
return self._leader_election.get_status()
return {"is_leader": True, "mode": "standalone"}
async def _run_scheduler_task(self):
"""Run scheduled inventory alert checks"""
start_time = datetime.now()
logger.info("Running scheduled inventory alert checks")
try:
# Run all alert checks
alerts_generated = await self.check_all_conditions()
duration = (datetime.now() - start_time).total_seconds()
logger.info(
"Completed scheduled inventory alert checks",
alerts_generated=alerts_generated,
duration_seconds=round(duration, 2)
)
except Exception as e:
logger.error(
"Error in inventory scheduler task",
error=str(e),
exc_info=True
)
async def check_all_conditions(self) -> int:
"""
Check all inventory alert conditions and trigger alerts for all tenants.
Returns:
int: Total number of alerts generated
"""
if not self.database_manager:
logger.warning("Database manager not available for inventory checks")
return 0
total_alerts = 0
try:
# Updated approach: run all checks using the new repository methods
# Get session to use for all checks
async with self.database_manager.get_session() as session:
# Check critical stock shortages (using direct SQL approach)
stock_alerts = await self._check_critical_stock_shortages(session)
total_alerts += stock_alerts
# Check expiring ingredients (using direct SQL approach)
expiry_alerts = await self._check_expiring_ingredients(session)
total_alerts += expiry_alerts
# Check overstock situations (using direct SQL approach)
overstock_alerts = await self._check_overstock_situations(session)
total_alerts += overstock_alerts
logger.info(
"Inventory alert checks completed for all tenants",
total_alerts=total_alerts
)
except Exception as e:
logger.error(
"Error during inventory alert checks for all tenants",
error=str(e),
exc_info=True
)
return total_alerts
async def _check_critical_stock_shortages(self, session) -> int:
"""
Check for critical stock shortages and trigger alerts.
Args:
session: Database session
Returns:
int: Number of stock shortage alerts generated
"""
try:
# Get critical stock shortages from repository
ingredient_repo = IngredientRepository(session)
stock_shortages = await ingredient_repo.get_critical_stock_shortages()
logger.info("Found critical stock shortages", count=len(stock_shortages))
alerts_generated = 0
for shortage in stock_shortages:
try:
# Handle asyncpg UUID objects properly
ingredient_id_val = shortage["ingredient_id"]
tenant_id_val = shortage["tenant_id"]
# Convert asyncpg UUID to string first, then to UUID
if hasattr(ingredient_id_val, 'hex'):
ingredient_id = UUID(hex=ingredient_id_val.hex)
else:
ingredient_id = UUID(str(ingredient_id_val))
if hasattr(tenant_id_val, 'hex'):
tenant_id = UUID(hex=tenant_id_val.hex)
else:
tenant_id = UUID(str(tenant_id_val))
current_quantity = float(shortage["current_quantity"])
required_quantity = float(shortage["required_quantity"])
shortage_amount = float(shortage["shortage_amount"])
# Emit critical stock shortage alert
await self.alert_service.emit_critical_stock_shortage(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=shortage.get("ingredient_name", "Unknown Ingredient"),
current_stock=current_quantity,
required_stock=required_quantity,
shortage_amount=shortage_amount,
minimum_stock=required_quantity
)
alerts_generated += 1
except Exception as e:
# Ensure ingredient_id is converted to string for logging to prevent UUID issues
ingredient_id_val = shortage.get("ingredient_id", "unknown")
if hasattr(ingredient_id_val, '__str__') and not isinstance(ingredient_id_val, str):
ingredient_id_val = str(ingredient_id_val)
logger.error(
"Error emitting critical stock shortage alert",
ingredient_id=ingredient_id_val,
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking critical stock shortages", error=str(e))
return 0
async def _check_expiring_ingredients(self, session) -> int:
"""
Check for ingredients that are about to expire and trigger alerts using direct SQL.
Args:
session: Database session
Returns:
int: Number of expiry alerts generated
"""
try:
# Use the stock repository to get expiring products
stock_repo = StockRepository(session)
# We'll need to get expiring products across all tenants
# For this, we'll use direct SQL since the method is tenant-specific
from sqlalchemy import text
query = text("""
SELECT
i.id as id,
i.name as name,
i.tenant_id as tenant_id,
s.id as stock_id,
s.batch_number,
s.expiration_date,
s.current_quantity as quantity,
EXTRACT(DAY FROM (s.expiration_date - CURRENT_DATE)) as days_until_expiry
FROM stock s
JOIN ingredients i ON s.ingredient_id = i.id
WHERE s.is_available = true
AND s.expiration_date <= CURRENT_DATE + INTERVAL '7 days' -- Next 7 days
AND s.expiration_date >= CURRENT_DATE -- Not already expired
ORDER BY s.expiration_date ASC, s.current_quantity DESC
""")
result = await session.execute(query)
rows = result.fetchall()
expiring_ingredients = []
for row in rows:
expiring_ingredients.append({
'id': row.id,
'name': row.name,
'tenant_id': row.tenant_id,
'stock_id': row.stock_id,
'quantity': float(row.quantity) if row.quantity else 0,
'days_until_expiry': int(row.days_until_expiry) if row.days_until_expiry else 0,
'expiry_date': row.expiration_date.isoformat() if row.expiration_date else None
})
logger.info(
"Found expiring ingredients",
count=len(expiring_ingredients)
)
alerts_generated = 0
for ingredient in expiring_ingredients:
try:
ingredient_id = UUID(str(ingredient["id"]))
tenant_id = UUID(str(ingredient["tenant_id"]))
stock_id = UUID(str(ingredient["stock_id"]))
days_until_expiry = int(ingredient.get("days_until_expiry", 0))
quantity = float(ingredient.get("quantity", 0))
# Emit ingredient expiry alert (using emit_urgent_expiry)
await self.alert_service.emit_urgent_expiry(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=ingredient.get("name", "Unknown Ingredient"),
stock_id=stock_id,
days_to_expiry=days_until_expiry,
quantity=quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting ingredient expiry alert",
ingredient_id=ingredient.get("id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking expiring ingredients", error=str(e))
return 0
async def _check_overstock_situations(self, session) -> int:
"""
Check for overstock situations and trigger alerts using direct SQL.
Args:
session: Database session
Returns:
int: Number of overstock alerts generated
"""
try:
# Get overstock situations using direct SQL
from sqlalchemy import text
query = text("""
WITH stock_analysis AS (
SELECT
i.id, i.name, i.tenant_id,
COALESCE(SUM(s.current_quantity), 0) as current_quantity,
i.max_stock_level as maximum_stock,
CASE
WHEN i.max_stock_level IS NOT NULL AND COALESCE(SUM(s.current_quantity), 0) > i.max_stock_level THEN 'overstock'
ELSE 'normal'
END as status
FROM ingredients i
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
WHERE i.is_active = true
GROUP BY i.id, i.name, i.tenant_id, i.max_stock_level
)
SELECT
id, name, tenant_id, current_quantity, maximum_stock
FROM stock_analysis
WHERE status = 'overstock'
ORDER BY current_quantity DESC
""")
result = await session.execute(query)
rows = result.fetchall()
overstock_items = []
for row in rows:
overstock_items.append({
'id': row.id,
'name': row.name,
'tenant_id': row.tenant_id,
'current_quantity': float(row.current_quantity) if row.current_quantity else 0,
'optimal_quantity': float(row.maximum_stock) if row.maximum_stock else float(row.current_quantity) * 0.8,
'excess_quantity': float(row.current_quantity - row.maximum_stock) if row.current_quantity and row.maximum_stock else 0
})
logger.info(
"Found overstock situations",
count=len(overstock_items)
)
alerts_generated = 0
for item in overstock_items:
try:
ingredient_id = UUID(str(item["id"]))
tenant_id = UUID(str(item["tenant_id"]))
current_quantity = float(item["current_quantity"])
optimal_quantity = float(item["optimal_quantity"])
excess_quantity = float(item["excess_quantity"])
# Emit overstock alert (using emit_overstock_warning)
await self.alert_service.emit_overstock_warning(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=item.get("name", "Unknown Ingredient"),
current_stock=current_quantity,
maximum_stock=optimal_quantity,
waste_risk_kg=excess_quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting overstock alert",
ingredient_id=item.get("id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking overstock situations", error=str(e))
return 0
async def trigger_manual_check(self, tenant_id: Optional[UUID] = None) -> Dict[str, Any]:
"""
Manually trigger inventory alert checks for a specific tenant or all tenants.
Args:
tenant_id: Optional tenant ID to check. If None, checks all tenants.
Returns:
Dict with alert generation results
"""
logger.info(
"Manually triggering inventory alert checks",
tenant_id=str(tenant_id) if tenant_id else "all_tenants"
)
try:
if tenant_id:
# Run tenant-specific alert checks
alerts_generated = await self.check_all_conditions_for_tenant(tenant_id)
else:
# Run all alert checks across all tenants
alerts_generated = await self.check_all_conditions()
return {
"success": True,
"tenant_id": str(tenant_id) if tenant_id else None,
"alerts_generated": alerts_generated,
"timestamp": datetime.now().isoformat(),
"message": "Inventory alert checks completed successfully"
}
except Exception as e:
logger.error(
"Error during manual inventory alert check",
error=str(e),
exc_info=True
)
return {
"success": False,
"tenant_id": str(tenant_id) if tenant_id else None,
"alerts_generated": 0,
"timestamp": datetime.now().isoformat(),
"error": str(e)
}
async def check_all_conditions_for_tenant(self, tenant_id: UUID) -> int:
"""
Check all inventory alert conditions for a specific tenant and trigger alerts.
Args:
tenant_id: Tenant ID to check conditions for
Returns:
int: Total number of alerts generated
"""
if not self.database_manager:
logger.warning("Database manager not available for inventory checks")
return 0
total_alerts = 0
try:
# Check critical stock shortages for specific tenant
stock_alerts = await self._check_critical_stock_shortages_for_tenant(tenant_id)
total_alerts += stock_alerts
# Check expiring ingredients for specific tenant
expiry_alerts = await self._check_expiring_ingredients_for_tenant(tenant_id)
total_alerts += expiry_alerts
# Check overstock situations for specific tenant
overstock_alerts = await self._check_overstock_situations_for_tenant(tenant_id)
total_alerts += overstock_alerts
logger.info(
"Tenant-specific inventory alert checks completed",
tenant_id=str(tenant_id),
total_alerts=total_alerts,
critical_stock_shortages=stock_alerts,
expiring_ingredients=expiry_alerts,
overstock_situations=overstock_alerts
)
except Exception as e:
logger.error(
"Error during tenant-specific inventory alert checks",
tenant_id=str(tenant_id),
error=str(e),
exc_info=True
)
return total_alerts
async def _check_critical_stock_shortages_for_tenant(self, tenant_id: UUID) -> int:
"""
Check for critical stock shortages for a specific tenant and trigger alerts.
Args:
tenant_id: Tenant ID to check for
Returns:
int: Number of stock shortage alerts generated
"""
try:
# Get stock issues for the specific tenant and filter for critical status
async with self.database_manager.get_session() as session:
ingredient_repo = IngredientRepository(session)
stock_issues = await ingredient_repo.get_stock_issues(tenant_id)
critical_shortages = [issue for issue in stock_issues if issue.get('status') == 'critical']
logger.info(f"Found {len(critical_shortages)} critical stock shortages for tenant",
count=len(critical_shortages), tenant_id=str(tenant_id))
alerts_generated = 0
for shortage in critical_shortages:
try:
# Handle asyncpg UUID objects properly
ingredient_id_val = shortage["id"]
if hasattr(ingredient_id_val, 'hex'):
ingredient_id = UUID(hex=ingredient_id_val.hex)
else:
ingredient_id = UUID(str(ingredient_id_val))
# Extract values with defaults
current_quantity = float(shortage.get("current_stock", 0))
minimum_stock = float(shortage.get("minimum_stock", 0))
shortage_amount = float(shortage.get("shortage_amount", 0))
# Emit critical stock shortage alert
await self.alert_service.emit_critical_stock_shortage(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=shortage.get("name", "Unknown Ingredient"),
current_stock=current_quantity,
required_stock=minimum_stock,
shortage_amount=shortage_amount,
minimum_stock=minimum_stock
)
alerts_generated += 1
except Exception as e:
# Ensure ingredient_id is converted to string for logging to prevent UUID issues
ingredient_id_val = shortage.get("id", "unknown")
if hasattr(ingredient_id_val, '__str__') and not isinstance(ingredient_id_val, str):
ingredient_id_val = str(ingredient_id_val)
logger.error(
"Error emitting critical stock shortage alert",
tenant_id=str(tenant_id),
ingredient_id=ingredient_id_val,
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking critical stock shortages for tenant", tenant_id=str(tenant_id), error=str(e))
return 0
async def _check_expiring_ingredients_for_tenant(self, tenant_id: UUID) -> int:
"""
Check for ingredients that are about to expire for a specific tenant and trigger alerts.
Args:
tenant_id: Tenant ID to check for
Returns:
int: Number of expiry alerts generated
"""
try:
expiring_ingredients = []
# Use stock repository to get expiring products for this tenant
try:
from app.repositories.stock_repository import StockRepository
# We'll need to create a session to access with the tenant-specific data
async with self.database_manager.get_session() as temp_session:
stock_repo = StockRepository(temp_session)
expiring_products = await stock_repo.get_expiring_products(tenant_id, days_threshold=7)
# Convert to expected format
for product in expiring_products:
expiring_ingredients.append({
'id': str(product.get('ingredient_id')),
'name': product.get('ingredient_name'),
'tenant_id': str(tenant_id),
'stock_id': str(product.get('stock_id')),
'quantity': product.get('current_quantity', 0),
'days_until_expiry': product.get('days_until_expiry', 0),
'expiry_date': product.get('expiration_date')
})
except Exception as repo_error:
logger.warning("Error using stock repository for expiring ingredients", error=str(repo_error))
# If repository access fails, return empty list
logger.info(
"Found expiring ingredients for tenant",
count=len(expiring_ingredients),
tenant_id=str(tenant_id)
)
alerts_generated = 0
for ingredient in expiring_ingredients:
try:
ingredient_id = UUID(ingredient["id"])
stock_id = UUID(ingredient["stock_id"])
days_until_expiry = int(ingredient.get("days_until_expiry", 0))
quantity = float(ingredient.get("quantity", 0))
# Emit ingredient expiry alert (using emit_urgent_expiry)
await self.alert_service.emit_urgent_expiry(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=ingredient.get("name", "Unknown Ingredient"),
stock_id=stock_id,
days_to_expiry=days_until_expiry,
quantity=quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting ingredient expiry alert",
tenant_id=str(tenant_id),
ingredient_id=ingredient.get("id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking expiring ingredients for tenant", tenant_id=str(tenant_id), error=str(e))
return 0
async def _check_overstock_situations_for_tenant(self, tenant_id: UUID) -> int:
"""
Check for overstock situations for a specific tenant and trigger alerts.
Args:
tenant_id: Tenant ID to check for
Returns:
int: Number of overstock alerts generated
"""
try:
# Use ingredient repository to get stock issues for this tenant and filter for overstock
overstock_items = []
try:
from app.repositories.ingredient_repository import IngredientRepository
async with self.database_manager.get_session() as temp_session:
ingredient_repo = IngredientRepository(temp_session)
stock_issues = await ingredient_repo.get_stock_issues(tenant_id)
# Filter for overstock situations
for issue in stock_issues:
if issue.get('status') == 'overstock':
overstock_items.append({
'id': str(issue.get('id')),
'name': issue.get('name'),
'tenant_id': str(tenant_id),
'current_quantity': issue.get('current_stock', 0),
'optimal_quantity': issue.get('maximum_stock', issue.get('current_stock', 0) * 0.8), # estimate
'excess_quantity': issue.get('shortage_amount', abs(issue.get('current_stock', 0) - issue.get('maximum_stock', issue.get('current_stock', 0) * 0.8))),
'excess_percentage': 0 # calculate if possible
})
except Exception as repo_error:
logger.warning("Error using ingredient repository for overstock situations", error=str(repo_error))
logger.info(
"Found overstock situations for tenant",
count=len(overstock_items),
tenant_id=str(tenant_id)
)
alerts_generated = 0
for item in overstock_items:
try:
ingredient_id = UUID(item["id"])
current_quantity = float(item["current_quantity"])
optimal_quantity = float(item.get("optimal_quantity", current_quantity * 0.8))
excess_quantity = float(item.get("excess_quantity", current_quantity - optimal_quantity))
# Emit overstock alert (using emit_overstock_warning)
await self.alert_service.emit_overstock_warning(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=item.get("name", "Unknown Ingredient"),
current_stock=current_quantity,
maximum_stock=optimal_quantity,
waste_risk_kg=excess_quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting overstock alert",
tenant_id=str(tenant_id),
ingredient_id=item.get("id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking overstock situations for tenant", tenant_id=str(tenant_id), error=str(e))
return 0
async def _check_critical_stock_shortages_all_tenants(self, session) -> int:
"""
Check for critical stock shortages across all tenants and trigger alerts.
Args:
session: Database session
Returns:
int: Number of stock shortage alerts generated
"""
try:
# Get ALL stock issues and filter for critical status - this gets data for ALL tenants
all_stock_issues = await self._get_all_stock_issues(session) # Custom method to get all issues
critical_shortages = [issue for issue in all_stock_issues if issue.get('status') == 'critical']
logger.info(f"Found {len(critical_shortages)} critical stock shortages across all tenants",
count=len(critical_shortages))
alerts_generated = 0
for shortage in critical_shortages:
try:
# Handle asyncpg UUID objects properly
ingredient_id_val = shortage["id"]
tenant_id_val = shortage["tenant_id"]
if hasattr(ingredient_id_val, 'hex'):
ingredient_id = UUID(hex=ingredient_id_val.hex)
else:
ingredient_id = UUID(str(ingredient_id_val))
if hasattr(tenant_id_val, 'hex'):
tenant_id = UUID(hex=tenant_id_val.hex)
else:
tenant_id = UUID(str(tenant_id_val))
# Extract values with defaults
current_quantity = float(shortage.get("current_stock", 0))
minimum_stock = float(shortage.get("minimum_stock", 0))
shortage_amount = float(shortage.get("shortage_amount", 0))
# Emit critical stock shortage alert
await self.alert_service.emit_critical_stock_shortage(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=shortage.get("name", "Unknown Ingredient"),
current_stock=current_quantity,
required_stock=minimum_stock,
shortage_amount=shortage_amount,
minimum_stock=minimum_stock
)
alerts_generated += 1
except Exception as e:
# Ensure ingredient_id and tenant_id are converted to strings for logging to prevent UUID issues
ingredient_id_val = shortage.get("id", "unknown")
if hasattr(ingredient_id_val, '__str__') and not isinstance(ingredient_id_val, str):
ingredient_id_val = str(ingredient_id_val)
tenant_id_val = shortage.get("tenant_id", "unknown")
if hasattr(tenant_id_val, '__str__') and not isinstance(tenant_id_val, str):
tenant_id_val = str(tenant_id_val)
logger.error(
"Error emitting critical stock shortage alert",
ingredient_id=ingredient_id_val,
tenant_id=tenant_id_val,
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking critical stock shortages across all tenants", error=str(e))
return 0
async def _check_expiring_ingredients_all_tenants_direct(self, session) -> int:
"""
Check for ingredients that are about to expire across all tenants and trigger alerts.
Args:
session: Database session
Returns:
int: Number of expiry alerts generated
"""
try:
# Get ALL expiring ingredients across all tenants using direct SQL
all_expiring_ingredients = await self._get_all_expiring_ingredients_direct(session)
logger.info(
"Found expiring ingredients across all tenants",
count=len(all_expiring_ingredients)
)
alerts_generated = 0
for ingredient in all_expiring_ingredients:
try:
ingredient_id = UUID(str(ingredient["id"]))
tenant_id = UUID(str(ingredient["tenant_id"]))
stock_id = UUID(str(ingredient["stock_id"]))
days_until_expiry = int(ingredient.get("days_until_expiry", 0))
quantity = float(ingredient.get("quantity", 0))
# Emit ingredient expiry alert (using emit_urgent_expiry)
await self.alert_service.emit_urgent_expiry(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=ingredient.get("name", "Unknown Ingredient"),
stock_id=stock_id,
days_to_expiry=days_until_expiry,
quantity=quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting ingredient expiry alert",
ingredient_id=ingredient.get("id", "unknown"),
tenant_id=ingredient.get("tenant_id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking expiring ingredients across all tenants", error=str(e))
return 0
async def _check_overstock_situations_all_tenants_direct(self, session) -> int:
"""
Check for overstock situations across all tenants and trigger alerts.
Args:
session: Database session
Returns:
int: Number of overstock alerts generated
"""
try:
# Get ALL overstock situations across all tenants using direct SQL
all_overstock_items = await self._get_all_overstock_situations_direct(session)
logger.info(
"Found overstock situations across all tenants",
count=len(all_overstock_items)
)
alerts_generated = 0
for item in all_overstock_items:
try:
ingredient_id = UUID(str(item["id"]))
tenant_id = UUID(str(item["tenant_id"]))
current_quantity = float(item["current_quantity"])
optimal_quantity = float(item.get("optimal_quantity", current_quantity * 0.8))
excess_quantity = float(item.get("excess_quantity", current_quantity - optimal_quantity))
# Emit overstock alert (using emit_overstock_warning)
await self.alert_service.emit_overstock_warning(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
ingredient_name=item.get("name", "Unknown Ingredient"),
current_stock=current_quantity,
maximum_stock=optimal_quantity,
waste_risk_kg=excess_quantity
)
alerts_generated += 1
except Exception as e:
logger.error(
"Error emitting overstock alert",
ingredient_id=item.get("id", "unknown"),
tenant_id=item.get("tenant_id", "unknown"),
error=str(e)
)
continue
return alerts_generated
except Exception as e:
logger.error("Error checking overstock situations across all tenants", error=str(e))
return 0
async def _get_all_stock_issues(self, session) -> list:
"""
Get all stock issues across all tenants (not just one tenant).
This is a workaround for missing inventory repository method.
"""
# Since there's no method to get all issues across all tenants directly,
# we'll need to query the database directly
from sqlalchemy import text
try:
query = text("""
WITH stock_analysis AS (
SELECT
i.id, i.name, i.tenant_id,
COALESCE(SUM(s.current_quantity), 0) as current_stock,
i.low_stock_threshold as minimum_stock,
i.max_stock_level as maximum_stock,
i.reorder_point,
0 as tomorrow_needed,
0 as avg_daily_usage,
7 as lead_time_days,
CASE
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold THEN 'critical'
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold * 1.2 THEN 'low'
WHEN i.max_stock_level IS NOT NULL AND COALESCE(SUM(s.current_quantity), 0) > i.max_stock_level THEN 'overstock'
ELSE 'normal'
END as status,
GREATEST(0, i.low_stock_threshold - COALESCE(SUM(s.current_quantity), 0)) as shortage_amount
FROM ingredients i
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
WHERE i.is_active = true
GROUP BY i.id, i.name, i.tenant_id, i.low_stock_threshold, i.max_stock_level, i.reorder_point
)
SELECT * FROM stock_analysis WHERE status != 'normal'
ORDER BY
CASE status
WHEN 'critical' THEN 1
WHEN 'low' THEN 2
WHEN 'overstock' THEN 3
END,
shortage_amount DESC
""")
result = await session.execute(query)
rows = result.fetchall()
stock_issues = []
for row in rows:
stock_issues.append({
'id': row.id,
'name': row.name,
'tenant_id': row.tenant_id,
'current_stock': float(row.current_stock) if row.current_stock else 0,
'minimum_stock': float(row.minimum_stock) if row.minimum_stock else 0,
'maximum_stock': float(row.maximum_stock) if row.maximum_stock else None,
'status': row.status,
'shortage_amount': float(row.shortage_amount) if row.shortage_amount else 0
})
return stock_issues
except Exception as e:
logger.error("Error getting all stock issues", error=str(e))
return []
async def _get_all_expiring_ingredients_direct(self, session) -> list:
"""
Get all expiring ingredients across all tenants (not just one tenant).
This is a workaround for missing inventory repository method.
"""
from sqlalchemy import text
try:
query = text("""
SELECT
i.id as id,
i.name as name,
i.tenant_id as tenant_id,
s.id as stock_id,
s.batch_number,
s.expiration_date,
s.current_quantity as quantity,
i.unit_of_measure,
s.unit_cost,
(s.current_quantity * s.unit_cost) as total_value,
CASE
WHEN s.expiration_date < CURRENT_DATE THEN 'expired'
WHEN s.expiration_date <= CURRENT_DATE + INTERVAL '1 day' THEN 'expires_today'
WHEN s.expiration_date <= CURRENT_DATE + INTERVAL '3 days' THEN 'expires_soon'
ELSE 'warning'
END as urgency,
EXTRACT(DAY FROM (s.expiration_date - CURRENT_DATE)) as days_until_expiry
FROM stock s
JOIN ingredients i ON s.ingredient_id = i.id
WHERE s.is_available = true
AND s.expiration_date <= CURRENT_DATE + INTERVAL '7 days' -- Next 7 days
ORDER BY s.expiration_date ASC, total_value DESC
""")
result = await session.execute(query)
rows = result.fetchall()
expiring_ingredients = []
for row in rows:
expiring_ingredients.append({
'id': row.id,
'name': row.name,
'tenant_id': row.tenant_id,
'stock_id': row.stock_id,
'quantity': float(row.quantity) if row.quantity else 0,
'days_until_expiry': int(row.days_until_expiry) if row.days_until_expiry else 0,
'expiry_date': row.expiration_date.isoformat() if row.expiration_date else None
})
return expiring_ingredients
except Exception as e:
logger.error("Error getting all expiring ingredients", error=str(e))
return []
async def _get_all_overstock_situations_direct(self, session) -> list:
"""
Get all overstock situations across all tenants (not just one tenant).
This is a workaround for missing inventory repository method.
"""
from sqlalchemy import text
try:
query = text("""
WITH stock_analysis AS (
SELECT
i.id, i.name, i.tenant_id,
COALESCE(SUM(s.current_quantity), 0) as current_stock,
i.max_stock_level as maximum_stock,
CASE
WHEN i.max_stock_level IS NOT NULL AND COALESCE(SUM(s.current_quantity), 0) > i.max_stock_level THEN 'overstock'
ELSE 'normal'
END as status
FROM ingredients i
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
WHERE i.is_active = true
GROUP BY i.id, i.name, i.tenant_id, i.max_stock_level
)
SELECT
id, name, tenant_id, current_stock, maximum_stock
FROM stock_analysis
WHERE status = 'overstock'
ORDER BY current_stock DESC
""")
result = await session.execute(query)
rows = result.fetchall()
overstock_items = []
for row in rows:
overstock_items.append({
'id': row.id,
'name': row.name,
'tenant_id': row.tenant_id,
'current_stock': float(row.current_stock) if row.current_stock else 0,
'maximum_stock': float(row.maximum_stock) if row.maximum_stock else None
})
# Convert to the expected format for alerts
formatted_items = []
for item in overstock_items:
formatted_items.append({
'id': item['id'],
'name': item['name'],
'tenant_id': item['tenant_id'],
'current_quantity': item['current_stock'],
'optimal_quantity': item['maximum_stock'] or (item['current_stock'] * 0.8),
'excess_quantity': item['current_stock'] - (item['maximum_stock'] or item['current_stock'] * 0.8)
})
return formatted_items
except Exception as e:
logger.error("Error getting all overstock situations", error=str(e))
return []