#!/usr/bin/env python3 """ Out of Stock Alert Test Sends a realistic out of stock alert to test the inventory monitoring system """ import aio_pika import asyncio import json import uuid from datetime import datetime async def send_out_of_stock_alert(): """Send a realistic out of stock alert""" # Connect to RabbitMQ connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/') channel = await connection.channel() # Declare the alerts exchange exchange = await channel.declare_exchange( 'alerts.exchange', aio_pika.ExchangeType.TOPIC, durable=True ) # Use actual tenant ID tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a' print("🚨 Sending out of stock alert...") # Out of Stock Alert out_of_stock_alert = { 'id': str(uuid.uuid4()), 'tenant_id': tenant_id, 'item_type': 'alert', 'type': 'out_of_stock', 'severity': 'urgent', 'service': 'inventory-service', 'title': '🚨 PRODUCTO AGOTADO: Harina de Trigo', 'message': 'El ingrediente "Harina de Trigo" está completamente agotado. Stock actual: 0 kg. Este ingrediente es crítico para la producción diaria y afecta 8 productos principales.', 'actions': [ 'Contactar proveedor inmediatamente', 'Revisar productos afectados', 'Activar stock de emergencia si disponible', 'Notificar al equipo de producción', 'Actualizar previsiones de ventas' ], 'metadata': { 'ingredient_name': 'Harina de Trigo', 'current_stock': 0, 'unit': 'kg', 'minimum_stock': 50, 'critical_ingredient': True, 'affected_products': [ 'Pan Francés', 'Croissants', 'Pan Integral', 'Baguettes', 'Pan de Molde', 'Empanadas', 'Pizza', 'Focaccia' ], 'supplier': 'Molinos San Andrés', 'last_order_date': '2024-01-15', 'estimated_restock_time': '24-48 horas', 'impact_level': 'HIGH' }, 'timestamp': datetime.utcnow().isoformat() } await exchange.publish( aio_pika.Message(json.dumps(out_of_stock_alert).encode()), routing_key='alert.urgent.inventory-service' ) print(f"✅ Out of stock alert sent: {out_of_stock_alert['title']}") print(f" Alert ID: {out_of_stock_alert['id']}") print(f" Severity: {out_of_stock_alert['severity'].upper()}") print(f" Service: {out_of_stock_alert['service']}") print(f" Affected products: {len(out_of_stock_alert['metadata']['affected_products'])}") await connection.close() print("\n🎯 Out of stock alert test completed!") print("Check your frontend for:") print(" • Urgent notification in header (red badge)") print(" • Real-time alert in dashboard panel") print(" • Toast notification (urgent priority)") print(" • Expandable card with ingredient details and actions") if __name__ == "__main__": asyncio.run(send_out_of_stock_alert())