#!/usr/bin/env python3 """ WORKING Alert Test Script This script successfully sends alerts that reach the frontend via SSE. Use this for future testing of the real-time alert system. """ import aio_pika import asyncio import json import uuid from datetime import datetime async def send_test_alerts(): """Send test alerts and recommendations to the system""" # Connect to RabbitMQ using the correct credentials connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/') channel = await connection.channel() # Declare the alerts exchange exchange = await channel.declare_exchange( 'alerts.exchange', aio_pika.ExchangeType.TOPIC, durable=True ) # Use your actual tenant ID (replace if different) tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a' print("šŸš€ Sending test alerts to the system...") # Test Alert - Urgent urgent_alert = { 'id': str(uuid.uuid4()), 'tenant_id': tenant_id, 'item_type': 'alert', 'type': 'test_alert', 'severity': 'urgent', # Must be lowercase: urgent, high, medium, low 'service': 'test-manual', 'title': '🚨 TEST URGENT ALERT', 'message': 'This is a test urgent alert that should appear in your frontend immediately', 'actions': ['Check frontend dashboard', 'Verify notification icon', 'Confirm real-time reception'], 'metadata': {'test': True, 'manual': True, 'timestamp': datetime.utcnow().isoformat()}, 'timestamp': datetime.utcnow().isoformat() } await exchange.publish( aio_pika.Message(json.dumps(urgent_alert).encode()), routing_key='alert.urgent.test-manual' ) print(f"āœ… Sent urgent alert: {urgent_alert['title']}") # Wait a moment between alerts await asyncio.sleep(2) # Test Alert - High Priority high_alert = { 'id': str(uuid.uuid4()), 'tenant_id': tenant_id, 'item_type': 'alert', 'type': 'test_alert_high', 'severity': 'high', 'service': 'test-manual', 'title': 'āš ļø TEST HIGH PRIORITY ALERT', 'message': 'This is a high priority test alert for the bakery system', 'actions': ['Review inventory', 'Contact supplier', 'Update stock levels'], 'metadata': {'test': True, 'priority': 'high', 'category': 'inventory'}, 'timestamp': datetime.utcnow().isoformat() } await exchange.publish( aio_pika.Message(json.dumps(high_alert).encode()), routing_key='alert.high.test-manual' ) print(f"āœ… Sent high priority alert: {high_alert['title']}") # Wait a moment await asyncio.sleep(2) # Test Recommendation recommendation = { 'id': str(uuid.uuid4()), 'tenant_id': tenant_id, 'item_type': 'recommendation', 'type': 'optimization_suggestion', 'severity': 'medium', 'service': 'test-manual', 'title': 'šŸ’” TEST OPTIMIZATION RECOMMENDATION', 'message': 'This is a test recommendation to optimize your bakery operations', 'actions': ['Review suggestion', 'Analyze impact', 'Consider implementation'], 'metadata': {'test': True, 'type': 'optimization', 'potential_savings': '15%'}, 'timestamp': datetime.utcnow().isoformat() } await exchange.publish( aio_pika.Message(json.dumps(recommendation).encode()), routing_key='recommendation.medium.test-manual' ) print(f"āœ… Sent recommendation: {recommendation['title']}") # Wait a moment await asyncio.sleep(2) # Test Low Priority Alert low_alert = { 'id': str(uuid.uuid4()), 'tenant_id': tenant_id, 'item_type': 'alert', 'type': 'test_info', 'severity': 'low', 'service': 'test-manual', 'title': 'ā„¹ļø TEST INFO ALERT', 'message': 'This is a low priority informational alert for testing', 'actions': ['Review when convenient'], 'metadata': {'test': True, 'info': True}, 'timestamp': datetime.utcnow().isoformat() } await exchange.publish( aio_pika.Message(json.dumps(low_alert).encode()), routing_key='alert.low.test-manual' ) print(f"āœ… Sent low priority alert: {low_alert['title']}") await connection.close() print("\nšŸŽ‰ All test alerts sent successfully!") print("\nYou should see these alerts in your frontend:") print(" 1. In the notification icon (header) - live counter update") print(" 2. In the panel de control - compact expandable cards") print(" 3. As real-time toast notifications") print("\nNote: Urgent and high alerts will show as toast notifications") if __name__ == "__main__": # Run the async function asyncio.run(send_test_alerts())