773 lines
37 KiB
Python
773 lines
37 KiB
Python
# services/inventory/app/services/inventory_alert_service.py
|
|
"""
|
|
Inventory-specific alert and recommendation detection service
|
|
Implements hybrid detection patterns for critical stock issues and optimization opportunities
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import List, Dict, Any, Optional
|
|
from uuid import UUID
|
|
from datetime import datetime, timedelta
|
|
import structlog
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy import text
|
|
|
|
from shared.alerts.base_service import BaseAlertService, AlertServiceMixin
|
|
from shared.alerts.templates import format_item_message
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
class InventoryAlertService(BaseAlertService, AlertServiceMixin):
|
|
"""Inventory service alert and recommendation detection"""
|
|
|
|
def setup_scheduled_checks(self):
|
|
"""Inventory-specific scheduled checks for alerts and recommendations"""
|
|
|
|
# SPACED SCHEDULING TO PREVENT CONCURRENT EXECUTION AND DEADLOCKS
|
|
|
|
# Critical stock checks - every 5 minutes (alerts) - Start at minute 0, 5, 10, etc.
|
|
self.scheduler.add_job(
|
|
self.check_stock_levels,
|
|
CronTrigger(minute='0,5,10,15,20,25,30,35,40,45,50,55'), # Explicit minutes
|
|
id='stock_levels',
|
|
misfire_grace_time=30,
|
|
max_instances=1
|
|
)
|
|
|
|
# Expiry checks - every 2 minutes (food safety critical, alerts) - Start at minute 1, 3, 7, etc.
|
|
self.scheduler.add_job(
|
|
self.check_expiring_products,
|
|
CronTrigger(minute='1,3,7,9,11,13,17,19,21,23,27,29,31,33,37,39,41,43,47,49,51,53,57,59'), # Avoid conflicts
|
|
id='expiry_check',
|
|
misfire_grace_time=30,
|
|
max_instances=1
|
|
)
|
|
|
|
# Temperature checks - every 5 minutes (alerts) - Start at minute 2, 12, 22, etc. (reduced frequency)
|
|
self.scheduler.add_job(
|
|
self.check_temperature_breaches,
|
|
CronTrigger(minute='2,12,22,32,42,52'), # Every 10 minutes, offset by 2
|
|
id='temperature_check',
|
|
misfire_grace_time=30,
|
|
max_instances=1
|
|
)
|
|
|
|
# Inventory optimization - every 30 minutes (recommendations) - Start at minute 15, 45
|
|
self.scheduler.add_job(
|
|
self.generate_inventory_recommendations,
|
|
CronTrigger(minute='15,45'), # Offset to avoid conflicts
|
|
id='inventory_recs',
|
|
misfire_grace_time=120,
|
|
max_instances=1
|
|
)
|
|
|
|
# Waste reduction analysis - every hour (recommendations) - Start at minute 30
|
|
self.scheduler.add_job(
|
|
self.generate_waste_reduction_recommendations,
|
|
CronTrigger(minute='30'), # Offset to avoid conflicts
|
|
id='waste_reduction_recs',
|
|
misfire_grace_time=300,
|
|
max_instances=1
|
|
)
|
|
|
|
logger.info("Inventory alert schedules configured",
|
|
service=self.config.SERVICE_NAME)
|
|
|
|
async def check_stock_levels(self):
|
|
"""Batch check all stock levels for critical shortages (alerts)"""
|
|
try:
|
|
self._checks_performed += 1
|
|
|
|
query = """
|
|
WITH stock_analysis AS (
|
|
SELECT
|
|
i.id, i.name, i.tenant_id,
|
|
COALESCE(SUM(s.current_quantity), 0) as current_stock,
|
|
i.low_stock_threshold as minimum_stock,
|
|
i.max_stock_level as maximum_stock,
|
|
i.reorder_point,
|
|
0 as tomorrow_needed,
|
|
0 as avg_daily_usage,
|
|
7 as lead_time_days,
|
|
CASE
|
|
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold THEN 'critical'
|
|
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold * 1.2 THEN 'low'
|
|
WHEN i.max_stock_level IS NOT NULL AND COALESCE(SUM(s.current_quantity), 0) > i.max_stock_level THEN 'overstock'
|
|
ELSE 'normal'
|
|
END as status,
|
|
GREATEST(0, i.low_stock_threshold - COALESCE(SUM(s.current_quantity), 0)) as shortage_amount
|
|
FROM ingredients i
|
|
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
|
|
WHERE i.tenant_id = :tenant_id AND i.is_active = true
|
|
GROUP BY i.id, i.name, i.tenant_id, i.low_stock_threshold, i.max_stock_level, i.reorder_point
|
|
)
|
|
SELECT * FROM stock_analysis WHERE status != 'normal'
|
|
ORDER BY
|
|
CASE status
|
|
WHEN 'critical' THEN 1
|
|
WHEN 'low' THEN 2
|
|
WHEN 'overstock' THEN 3
|
|
END,
|
|
shortage_amount DESC
|
|
"""
|
|
|
|
tenants = await self.get_active_tenants()
|
|
|
|
for tenant_id in tenants:
|
|
try:
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(30): # 30 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query), {"tenant_id": tenant_id})
|
|
issues = result.fetchall()
|
|
|
|
for issue in issues:
|
|
# Convert SQLAlchemy Row to dictionary for easier access
|
|
issue_dict = dict(issue._mapping) if hasattr(issue, '_mapping') else dict(issue)
|
|
await self._process_stock_issue(tenant_id, issue_dict)
|
|
|
|
except Exception as e:
|
|
logger.error("Error checking stock for tenant",
|
|
tenant_id=str(tenant_id),
|
|
error=str(e))
|
|
|
|
logger.debug("Stock level check completed",
|
|
tenants_checked=len(tenants))
|
|
|
|
except Exception as e:
|
|
logger.error("Stock level check failed", error=str(e))
|
|
self._errors_count += 1
|
|
|
|
async def _process_stock_issue(self, tenant_id: UUID, issue: Dict[str, Any]):
|
|
"""Process individual stock issue"""
|
|
try:
|
|
if issue['status'] == 'critical':
|
|
# Critical stock shortage - immediate alert
|
|
template_data = self.format_spanish_message(
|
|
'critical_stock_shortage',
|
|
ingredient_name=issue["name"],
|
|
current_stock=issue["current_stock"],
|
|
required_stock=issue["tomorrow_needed"] or issue["minimum_stock"],
|
|
shortage_amount=issue["shortage_amount"]
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'critical_stock_shortage',
|
|
'severity': 'urgent',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'ingredient_id': str(issue['id']),
|
|
'current_stock': float(issue['current_stock']),
|
|
'minimum_stock': float(issue['minimum_stock']),
|
|
'shortage_amount': float(issue['shortage_amount']),
|
|
'tomorrow_needed': float(issue['tomorrow_needed'] or 0),
|
|
'lead_time_days': issue['lead_time_days']
|
|
}
|
|
}, item_type='alert')
|
|
|
|
elif issue['status'] == 'low':
|
|
# Low stock - high priority alert
|
|
template_data = self.format_spanish_message(
|
|
'critical_stock_shortage',
|
|
ingredient_name=issue["name"],
|
|
current_stock=issue["current_stock"],
|
|
required_stock=issue["minimum_stock"]
|
|
)
|
|
|
|
severity = self.get_business_hours_severity('high')
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'low_stock_warning',
|
|
'severity': severity,
|
|
'title': f'⚠️ Stock Bajo: {issue["name"]}',
|
|
'message': f'Stock actual {issue["current_stock"]}kg, mínimo {issue["minimum_stock"]}kg. Considerar pedido pronto.',
|
|
'actions': ['Revisar consumo', 'Programar pedido', 'Contactar proveedor'],
|
|
'metadata': {
|
|
'ingredient_id': str(issue['id']),
|
|
'current_stock': float(issue['current_stock']),
|
|
'minimum_stock': float(issue['minimum_stock'])
|
|
}
|
|
}, item_type='alert')
|
|
|
|
elif issue['status'] == 'overstock':
|
|
# Overstock - medium priority alert
|
|
severity = self.get_business_hours_severity('medium')
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'overstock_warning',
|
|
'severity': severity,
|
|
'title': f'📦 Exceso de Stock: {issue["name"]}',
|
|
'message': f'Stock actual {issue["current_stock"]}kg excede máximo {issue["maximum_stock"]}kg. Revisar para evitar caducidad.',
|
|
'actions': ['Revisar caducidades', 'Aumentar producción', 'Ofertas especiales', 'Ajustar pedidos'],
|
|
'metadata': {
|
|
'ingredient_id': str(issue['id']),
|
|
'current_stock': float(issue['current_stock']),
|
|
'maximum_stock': float(issue['maximum_stock'])
|
|
}
|
|
}, item_type='alert')
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing stock issue",
|
|
ingredient_id=str(issue.get('id')),
|
|
error=str(e))
|
|
|
|
async def check_expiring_products(self):
|
|
"""Check for products approaching expiry (alerts)"""
|
|
try:
|
|
self._checks_performed += 1
|
|
|
|
query = """
|
|
SELECT
|
|
i.id, i.name, i.tenant_id,
|
|
s.id as stock_id, s.expiration_date, s.current_quantity,
|
|
EXTRACT(days FROM (s.expiration_date - CURRENT_DATE)) as days_to_expiry
|
|
FROM ingredients i
|
|
JOIN stock s ON s.ingredient_id = i.id
|
|
WHERE s.expiration_date <= CURRENT_DATE + INTERVAL '7 days'
|
|
AND s.current_quantity > 0
|
|
AND s.is_available = true
|
|
AND s.expiration_date IS NOT NULL
|
|
ORDER BY s.expiration_date ASC
|
|
"""
|
|
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(30): # 30 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query))
|
|
expiring_items = result.fetchall()
|
|
|
|
# Group by tenant
|
|
by_tenant = {}
|
|
for item in expiring_items:
|
|
# Convert SQLAlchemy Row to dictionary for easier access
|
|
item_dict = dict(item._mapping) if hasattr(item, '_mapping') else dict(item)
|
|
tenant_id = item_dict['tenant_id']
|
|
if tenant_id not in by_tenant:
|
|
by_tenant[tenant_id] = []
|
|
by_tenant[tenant_id].append(item_dict)
|
|
|
|
for tenant_id, items in by_tenant.items():
|
|
await self._process_expiring_items(tenant_id, items)
|
|
|
|
except Exception as e:
|
|
logger.error("Expiry check failed", error=str(e))
|
|
self._errors_count += 1
|
|
|
|
async def _process_expiring_items(self, tenant_id: UUID, items: List[Dict[str, Any]]):
|
|
"""Process expiring items for a tenant"""
|
|
try:
|
|
# Group by urgency
|
|
expired = [i for i in items if i['days_to_expiry'] <= 0]
|
|
urgent = [i for i in items if 0 < i['days_to_expiry'] <= 2]
|
|
warning = [i for i in items if 2 < i['days_to_expiry'] <= 7]
|
|
|
|
# Process expired products (urgent alerts)
|
|
if expired:
|
|
product_count = len(expired)
|
|
product_names = [i['name'] for i in expired[:3]] # First 3 names
|
|
if len(expired) > 3:
|
|
product_names.append(f"y {len(expired) - 3} más")
|
|
|
|
template_data = self.format_spanish_message(
|
|
'expired_products',
|
|
product_count=product_count,
|
|
product_names=", ".join(product_names)
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'expired_products',
|
|
'severity': 'urgent',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'expired_items': [
|
|
{
|
|
'id': str(item['id']),
|
|
'name': item['name'],
|
|
'stock_id': str(item['stock_id']),
|
|
'quantity': float(item['current_quantity']),
|
|
'days_expired': abs(item['days_to_expiry'])
|
|
} for item in expired
|
|
]
|
|
}
|
|
}, item_type='alert')
|
|
|
|
# Process urgent expiry (high alerts)
|
|
if urgent:
|
|
for item in urgent:
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'urgent_expiry',
|
|
'severity': 'high',
|
|
'title': f'⏰ Caducidad Urgente: {item["name"]}',
|
|
'message': f'{item["name"]} caduca en {item["days_to_expiry"]} día(s). Usar prioritariamente.',
|
|
'actions': ['Usar inmediatamente', 'Promoción especial', 'Revisar recetas', 'Documentar'],
|
|
'metadata': {
|
|
'ingredient_id': str(item['id']),
|
|
'stock_id': str(item['stock_id']),
|
|
'days_to_expiry': item['days_to_expiry'],
|
|
'quantity': float(item['current_quantity'])
|
|
}
|
|
}, item_type='alert')
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing expiring items",
|
|
tenant_id=str(tenant_id),
|
|
error=str(e))
|
|
|
|
async def check_temperature_breaches(self):
|
|
"""Check for temperature breaches (alerts)"""
|
|
try:
|
|
self._checks_performed += 1
|
|
|
|
query = """
|
|
SELECT
|
|
t.id, t.equipment_id as sensor_id, t.storage_location as location,
|
|
t.temperature_celsius as temperature,
|
|
t.target_temperature_max as max_threshold, t.tenant_id,
|
|
COALESCE(t.deviation_minutes, 0) as breach_duration_minutes
|
|
FROM temperature_logs t
|
|
WHERE t.temperature_celsius > COALESCE(t.target_temperature_max, 25)
|
|
AND NOT t.is_within_range
|
|
AND COALESCE(t.deviation_minutes, 0) >= 30 -- Only after 30 minutes
|
|
AND (t.recorded_at < NOW() - INTERVAL '15 minutes' OR t.alert_triggered = false) -- Avoid spam
|
|
ORDER BY t.temperature_celsius DESC, t.deviation_minutes DESC
|
|
"""
|
|
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(30): # 30 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query))
|
|
breaches = result.fetchall()
|
|
|
|
for breach in breaches:
|
|
# Convert SQLAlchemy Row to dictionary for easier access
|
|
breach_dict = dict(breach._mapping) if hasattr(breach, '_mapping') else dict(breach)
|
|
await self._process_temperature_breach(breach_dict)
|
|
|
|
except Exception as e:
|
|
logger.error("Temperature check failed", error=str(e))
|
|
self._errors_count += 1
|
|
|
|
async def _process_temperature_breach(self, breach: Dict[str, Any]):
|
|
"""Process temperature breach"""
|
|
try:
|
|
# Determine severity based on duration and temperature
|
|
duration_minutes = breach['breach_duration_minutes']
|
|
temp_excess = breach['temperature'] - breach['max_threshold']
|
|
|
|
if duration_minutes > 120 or temp_excess > 10:
|
|
severity = 'urgent'
|
|
elif duration_minutes > 60 or temp_excess > 5:
|
|
severity = 'high'
|
|
else:
|
|
severity = 'medium'
|
|
|
|
template_data = self.format_spanish_message(
|
|
'temperature_breach',
|
|
location=breach['location'],
|
|
temperature=breach['temperature'],
|
|
duration=duration_minutes
|
|
)
|
|
|
|
await self.publish_item(breach['tenant_id'], {
|
|
'type': 'temperature_breach',
|
|
'severity': severity,
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'sensor_id': breach['sensor_id'],
|
|
'location': breach['location'],
|
|
'temperature': float(breach['temperature']),
|
|
'max_threshold': float(breach['max_threshold']),
|
|
'duration_minutes': duration_minutes,
|
|
'temperature_excess': temp_excess
|
|
}
|
|
}, item_type='alert')
|
|
|
|
# Update alert triggered flag to avoid spam
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(10): # 10 second timeout for simple update
|
|
async with self.db_manager.get_background_session() as session:
|
|
await session.execute(
|
|
text("UPDATE temperature_logs SET alert_triggered = true WHERE id = :id"),
|
|
{"id": breach['id']}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing temperature breach",
|
|
sensor_id=breach.get('sensor_id'),
|
|
error=str(e))
|
|
|
|
async def generate_inventory_recommendations(self):
|
|
"""Generate optimization recommendations based on usage patterns"""
|
|
try:
|
|
self._checks_performed += 1
|
|
|
|
# Analyze stock levels vs usage patterns
|
|
query = """
|
|
WITH usage_analysis AS (
|
|
SELECT
|
|
i.id, i.name, i.tenant_id,
|
|
i.low_stock_threshold as minimum_stock,
|
|
i.max_stock_level as maximum_stock,
|
|
COALESCE(SUM(s.current_quantity), 0) as current_stock,
|
|
AVG(sm.quantity) FILTER (WHERE sm.movement_type = 'production_use'
|
|
AND sm.created_at > CURRENT_DATE - INTERVAL '30 days') as avg_daily_usage,
|
|
COUNT(sm.id) FILTER (WHERE sm.movement_type = 'production_use'
|
|
AND sm.created_at > CURRENT_DATE - INTERVAL '30 days') as usage_days,
|
|
MAX(sm.created_at) FILTER (WHERE sm.movement_type = 'production_use') as last_used
|
|
FROM ingredients i
|
|
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
|
|
LEFT JOIN stock_movements sm ON sm.ingredient_id = i.id
|
|
WHERE i.is_active = true AND i.tenant_id = :tenant_id
|
|
GROUP BY i.id, i.name, i.tenant_id, i.low_stock_threshold, i.max_stock_level
|
|
HAVING COUNT(sm.id) FILTER (WHERE sm.movement_type = 'production_use'
|
|
AND sm.created_at > CURRENT_DATE - INTERVAL '30 days') >= 3
|
|
),
|
|
recommendations AS (
|
|
SELECT *,
|
|
CASE
|
|
WHEN avg_daily_usage * 7 > maximum_stock AND maximum_stock IS NOT NULL THEN 'increase_max'
|
|
WHEN avg_daily_usage * 3 < minimum_stock THEN 'decrease_min'
|
|
WHEN current_stock / NULLIF(avg_daily_usage, 0) > 14 THEN 'reduce_stock'
|
|
WHEN avg_daily_usage > 0 AND minimum_stock / avg_daily_usage < 3 THEN 'increase_min'
|
|
ELSE null
|
|
END as recommendation_type
|
|
FROM usage_analysis
|
|
WHERE avg_daily_usage > 0
|
|
)
|
|
SELECT * FROM recommendations WHERE recommendation_type IS NOT NULL
|
|
ORDER BY avg_daily_usage DESC
|
|
"""
|
|
|
|
tenants = await self.get_active_tenants()
|
|
|
|
for tenant_id in tenants:
|
|
try:
|
|
from sqlalchemy import text
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(30): # 30 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query), {"tenant_id": tenant_id})
|
|
recommendations = result.fetchall()
|
|
|
|
for rec in recommendations:
|
|
# Convert SQLAlchemy Row to dictionary for easier access
|
|
rec_dict = dict(rec._mapping) if hasattr(rec, '_mapping') else dict(rec)
|
|
await self._generate_stock_recommendation(tenant_id, rec_dict)
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating recommendations for tenant",
|
|
tenant_id=str(tenant_id),
|
|
error=str(e))
|
|
|
|
except Exception as e:
|
|
logger.error("Inventory recommendations failed", error=str(e))
|
|
self._errors_count += 1
|
|
|
|
async def _generate_stock_recommendation(self, tenant_id: UUID, rec: Dict[str, Any]):
|
|
"""Generate specific stock recommendation"""
|
|
try:
|
|
if not self.should_send_recommendation(tenant_id, rec['recommendation_type']):
|
|
return
|
|
|
|
rec_type = rec['recommendation_type']
|
|
|
|
if rec_type == 'increase_max':
|
|
suggested_max = rec['avg_daily_usage'] * 10 # 10 days supply
|
|
template_data = self.format_spanish_message(
|
|
'inventory_optimization',
|
|
ingredient_name=rec['name'],
|
|
period=30,
|
|
suggested_increase=suggested_max - rec['maximum_stock']
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'inventory_optimization',
|
|
'severity': 'medium',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'ingredient_id': str(rec['id']),
|
|
'current_max': float(rec['maximum_stock']),
|
|
'suggested_max': float(suggested_max),
|
|
'avg_daily_usage': float(rec['avg_daily_usage']),
|
|
'recommendation_type': rec_type
|
|
}
|
|
}, item_type='recommendation')
|
|
|
|
elif rec_type == 'decrease_min':
|
|
suggested_min = rec['avg_daily_usage'] * 3 # 3 days safety stock
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'inventory_optimization',
|
|
'severity': 'low',
|
|
'title': f'📉 Optimización de Stock Mínimo: {rec["name"]}',
|
|
'message': f'Uso promedio sugiere reducir stock mínimo de {rec["minimum_stock"]}kg a {suggested_min:.1f}kg.',
|
|
'actions': ['Revisar niveles mínimos', 'Analizar tendencias', 'Ajustar configuración'],
|
|
'metadata': {
|
|
'ingredient_id': str(rec['id']),
|
|
'current_min': float(rec['minimum_stock']),
|
|
'suggested_min': float(suggested_min),
|
|
'avg_daily_usage': float(rec['avg_daily_usage']),
|
|
'recommendation_type': rec_type
|
|
}
|
|
}, item_type='recommendation')
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating stock recommendation",
|
|
ingredient_id=str(rec.get('id')),
|
|
error=str(e))
|
|
|
|
async def generate_waste_reduction_recommendations(self):
|
|
"""Generate waste reduction recommendations"""
|
|
try:
|
|
# Analyze waste patterns from stock movements
|
|
query = """
|
|
SELECT
|
|
i.id, i.name, i.tenant_id,
|
|
SUM(sm.quantity) as total_waste_30d,
|
|
COUNT(sm.id) as waste_incidents,
|
|
AVG(sm.quantity) as avg_waste_per_incident,
|
|
COALESCE(sm.reason_code, 'unknown') as waste_reason
|
|
FROM ingredients i
|
|
JOIN stock_movements sm ON sm.ingredient_id = i.id
|
|
WHERE sm.movement_type = 'waste'
|
|
AND sm.created_at > CURRENT_DATE - INTERVAL '30 days'
|
|
AND i.tenant_id = :tenant_id
|
|
GROUP BY i.id, i.name, i.tenant_id, sm.reason_code
|
|
HAVING SUM(sm.quantity) > 5 -- More than 5kg wasted
|
|
ORDER BY total_waste_30d DESC
|
|
"""
|
|
|
|
tenants = await self.get_active_tenants()
|
|
|
|
for tenant_id in tenants:
|
|
try:
|
|
from sqlalchemy import text
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(30): # 30 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query), {"tenant_id": tenant_id})
|
|
waste_data = result.fetchall()
|
|
|
|
for waste in waste_data:
|
|
# Convert SQLAlchemy Row to dictionary for easier access
|
|
waste_dict = dict(waste._mapping) if hasattr(waste, '_mapping') else dict(waste)
|
|
await self._generate_waste_recommendation(tenant_id, waste_dict)
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating waste recommendations",
|
|
tenant_id=str(tenant_id),
|
|
error=str(e))
|
|
|
|
except Exception as e:
|
|
logger.error("Waste reduction recommendations failed", error=str(e))
|
|
self._errors_count += 1
|
|
|
|
async def _generate_waste_recommendation(self, tenant_id: UUID, waste: Dict[str, Any]):
|
|
"""Generate waste reduction recommendation"""
|
|
try:
|
|
waste_percentage = (waste['total_waste_30d'] / (waste['total_waste_30d'] + 100)) * 100 # Simplified calculation
|
|
|
|
template_data = self.format_spanish_message(
|
|
'waste_reduction',
|
|
product=waste['name'],
|
|
waste_reduction_percent=waste_percentage
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'waste_reduction',
|
|
'severity': 'low',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'ingredient_id': str(waste['id']),
|
|
'total_waste_30d': float(waste['total_waste_30d']),
|
|
'waste_incidents': waste['waste_incidents'],
|
|
'waste_reason': waste['waste_reason'],
|
|
'estimated_reduction_percent': waste_percentage
|
|
}
|
|
}, item_type='recommendation')
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating waste recommendation",
|
|
ingredient_id=str(waste.get('id')),
|
|
error=str(e))
|
|
|
|
async def register_db_listeners(self, conn):
|
|
"""Register inventory-specific database listeners"""
|
|
try:
|
|
await conn.add_listener('stock_alerts', self.handle_stock_db_alert)
|
|
await conn.add_listener('temperature_alerts', self.handle_temperature_db_alert)
|
|
|
|
logger.info("Database listeners registered",
|
|
service=self.config.SERVICE_NAME)
|
|
except Exception as e:
|
|
logger.error("Failed to register database listeners",
|
|
service=self.config.SERVICE_NAME,
|
|
error=str(e))
|
|
|
|
async def handle_stock_db_alert(self, connection, pid, channel, payload):
|
|
"""Handle stock alert from database trigger"""
|
|
try:
|
|
data = json.loads(payload)
|
|
tenant_id = UUID(data['tenant_id'])
|
|
|
|
template_data = self.format_spanish_message(
|
|
'critical_stock_shortage',
|
|
ingredient_name=data['name'],
|
|
current_stock=data['current_stock'],
|
|
required_stock=data['minimum_stock']
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'critical_stock_shortage',
|
|
'severity': 'urgent',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'ingredient_id': data['ingredient_id'],
|
|
'current_stock': data['current_stock'],
|
|
'minimum_stock': data['minimum_stock'],
|
|
'trigger_source': 'database'
|
|
}
|
|
}, item_type='alert')
|
|
|
|
except Exception as e:
|
|
logger.error("Error handling stock DB alert", error=str(e))
|
|
|
|
async def handle_temperature_db_alert(self, connection, pid, channel, payload):
|
|
"""Handle temperature alert from database trigger"""
|
|
try:
|
|
data = json.loads(payload)
|
|
tenant_id = UUID(data['tenant_id'])
|
|
|
|
template_data = self.format_spanish_message(
|
|
'temperature_breach',
|
|
location=data['location'],
|
|
temperature=data['temperature'],
|
|
duration=data['duration']
|
|
)
|
|
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'temperature_breach',
|
|
'severity': 'high',
|
|
'title': template_data['title'],
|
|
'message': template_data['message'],
|
|
'actions': template_data['actions'],
|
|
'metadata': {
|
|
'sensor_id': data['sensor_id'],
|
|
'location': data['location'],
|
|
'temperature': data['temperature'],
|
|
'duration': data['duration'],
|
|
'trigger_source': 'database'
|
|
}
|
|
}, item_type='alert')
|
|
|
|
except Exception as e:
|
|
logger.error("Error handling temperature DB alert", error=str(e))
|
|
|
|
async def start_event_listener(self):
|
|
"""Listen for inventory-affecting events"""
|
|
try:
|
|
# Subscribe to order events that might affect inventory
|
|
await self.rabbitmq_client.consume_events(
|
|
"bakery_events",
|
|
f"inventory.orders.{self.config.SERVICE_NAME}",
|
|
"orders.placed",
|
|
self.handle_order_placed
|
|
)
|
|
|
|
logger.info("Event listeners started",
|
|
service=self.config.SERVICE_NAME)
|
|
except Exception as e:
|
|
logger.error("Failed to start event listeners",
|
|
service=self.config.SERVICE_NAME,
|
|
error=str(e))
|
|
|
|
async def handle_order_placed(self, message):
|
|
"""Check if order critically affects stock"""
|
|
try:
|
|
order = json.loads(message.body)
|
|
tenant_id = UUID(order['tenant_id'])
|
|
|
|
for item in order.get('items', []):
|
|
# Check stock impact
|
|
stock_info = await self.get_stock_after_order(item['ingredient_id'], item['quantity'])
|
|
|
|
if stock_info and stock_info['remaining'] < stock_info['minimum_stock']:
|
|
await self.publish_item(tenant_id, {
|
|
'type': 'stock_depleted_by_order',
|
|
'severity': 'high',
|
|
'title': f'⚠️ Pedido Agota Stock: {stock_info["name"]}',
|
|
'message': f'Pedido #{order["id"]} dejará stock en {stock_info["remaining"]}kg (mínimo {stock_info["minimum_stock"]}kg)',
|
|
'actions': ['Revisar pedido', 'Contactar proveedor', 'Ajustar producción', 'Usar stock reserva'],
|
|
'metadata': {
|
|
'order_id': order['id'],
|
|
'ingredient_id': item['ingredient_id'],
|
|
'order_quantity': item['quantity'],
|
|
'remaining_stock': stock_info['remaining'],
|
|
'minimum_stock': stock_info['minimum_stock']
|
|
}
|
|
}, item_type='alert')
|
|
|
|
except Exception as e:
|
|
logger.error("Error handling order placed event", error=str(e))
|
|
|
|
async def get_active_tenants(self) -> List[UUID]:
|
|
"""Get list of active tenant IDs from ingredients table (inventory service specific)"""
|
|
try:
|
|
query = text("SELECT DISTINCT tenant_id FROM ingredients WHERE is_active = true")
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(10): # 10 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(query)
|
|
# Handle PostgreSQL UUID objects properly
|
|
tenant_ids = []
|
|
for row in result.fetchall():
|
|
tenant_id = row.tenant_id
|
|
# Convert to UUID if it's not already
|
|
if isinstance(tenant_id, UUID):
|
|
tenant_ids.append(tenant_id)
|
|
else:
|
|
tenant_ids.append(UUID(str(tenant_id)))
|
|
return tenant_ids
|
|
except Exception as e:
|
|
logger.error("Error fetching active tenants from ingredients", error=str(e))
|
|
return []
|
|
|
|
async def get_stock_after_order(self, ingredient_id: str, order_quantity: float) -> Optional[Dict[str, Any]]:
|
|
"""Get stock information after hypothetical order"""
|
|
try:
|
|
query = """
|
|
SELECT i.id, i.name,
|
|
COALESCE(SUM(s.current_quantity), 0) as current_stock,
|
|
i.low_stock_threshold as minimum_stock,
|
|
(COALESCE(SUM(s.current_quantity), 0) - :order_quantity) as remaining
|
|
FROM ingredients i
|
|
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
|
|
WHERE i.id = :ingredient_id
|
|
GROUP BY i.id, i.name, i.low_stock_threshold
|
|
"""
|
|
|
|
# Add timeout to prevent hanging connections
|
|
async with asyncio.timeout(10): # 10 second timeout
|
|
async with self.db_manager.get_background_session() as session:
|
|
result = await session.execute(text(query), {"ingredient_id": ingredient_id, "order_quantity": order_quantity})
|
|
row = result.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting stock after order",
|
|
ingredient_id=ingredient_id,
|
|
error=str(e))
|
|
return None |