Files
bakery-ia/test_out_of_stock_alert.py
2025-09-21 17:35:36 +02:00

94 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""
Out of Stock Alert Test
Sends a realistic out of stock alert to test the inventory monitoring system
"""
import aio_pika
import asyncio
import json
import uuid
from datetime import datetime
async def send_out_of_stock_alert():
"""Send a realistic out of stock alert"""
# Connect to RabbitMQ
connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/')
channel = await connection.channel()
# Declare the alerts exchange
exchange = await channel.declare_exchange(
'alerts.exchange',
aio_pika.ExchangeType.TOPIC,
durable=True
)
# Use actual tenant ID
tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a'
print("🚨 Sending out of stock alert...")
# Out of Stock Alert
out_of_stock_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'out_of_stock',
'severity': 'urgent',
'service': 'inventory-service',
'title': '🚨 PRODUCTO AGOTADO: Harina de Trigo',
'message': 'El ingrediente "Harina de Trigo" está completamente agotado. Stock actual: 0 kg. Este ingrediente es crítico para la producción diaria y afecta 8 productos principales.',
'actions': [
'Contactar proveedor inmediatamente',
'Revisar productos afectados',
'Activar stock de emergencia si disponible',
'Notificar al equipo de producción',
'Actualizar previsiones de ventas'
],
'metadata': {
'ingredient_name': 'Harina de Trigo',
'current_stock': 0,
'unit': 'kg',
'minimum_stock': 50,
'critical_ingredient': True,
'affected_products': [
'Pan Francés',
'Croissants',
'Pan Integral',
'Baguettes',
'Pan de Molde',
'Empanadas',
'Pizza',
'Focaccia'
],
'supplier': 'Molinos San Andrés',
'last_order_date': '2024-01-15',
'estimated_restock_time': '24-48 horas',
'impact_level': 'HIGH'
},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(out_of_stock_alert).encode()),
routing_key='alert.urgent.inventory-service'
)
print(f"✅ Out of stock alert sent: {out_of_stock_alert['title']}")
print(f" Alert ID: {out_of_stock_alert['id']}")
print(f" Severity: {out_of_stock_alert['severity'].upper()}")
print(f" Service: {out_of_stock_alert['service']}")
print(f" Affected products: {len(out_of_stock_alert['metadata']['affected_products'])}")
await connection.close()
print("\n🎯 Out of stock alert test completed!")
print("Check your frontend for:")
print(" • Urgent notification in header (red badge)")
print(" • Real-time alert in dashboard panel")
print(" • Toast notification (urgent priority)")
print(" • Expandable card with ingredient details and actions")
if __name__ == "__main__":
asyncio.run(send_out_of_stock_alert())