137 lines
4.7 KiB
Python
137 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
WORKING Alert Test Script
|
||
This script successfully sends alerts that reach the frontend via SSE.
|
||
Use this for future testing of the real-time alert system.
|
||
"""
|
||
|
||
import aio_pika
|
||
import asyncio
|
||
import json
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
async def send_test_alerts():
|
||
"""Send test alerts and recommendations to the system"""
|
||
|
||
# Connect to RabbitMQ using the correct credentials
|
||
connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/')
|
||
channel = await connection.channel()
|
||
|
||
# Declare the alerts exchange
|
||
exchange = await channel.declare_exchange(
|
||
'alerts.exchange',
|
||
aio_pika.ExchangeType.TOPIC,
|
||
durable=True
|
||
)
|
||
|
||
# Use your actual tenant ID (replace if different)
|
||
tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a'
|
||
|
||
print("🚀 Sending test alerts to the system...")
|
||
|
||
# Test Alert - Urgent
|
||
urgent_alert = {
|
||
'id': str(uuid.uuid4()),
|
||
'tenant_id': tenant_id,
|
||
'item_type': 'alert',
|
||
'type': 'test_alert',
|
||
'severity': 'urgent', # Must be lowercase: urgent, high, medium, low
|
||
'service': 'test-manual',
|
||
'title': '🚨 TEST URGENT ALERT',
|
||
'message': 'This is a test urgent alert that should appear in your frontend immediately',
|
||
'actions': ['Check frontend dashboard', 'Verify notification icon', 'Confirm real-time reception'],
|
||
'metadata': {'test': True, 'manual': True, 'timestamp': datetime.utcnow().isoformat()},
|
||
'timestamp': datetime.utcnow().isoformat()
|
||
}
|
||
|
||
await exchange.publish(
|
||
aio_pika.Message(json.dumps(urgent_alert).encode()),
|
||
routing_key='alert.urgent.test-manual'
|
||
)
|
||
print(f"✅ Sent urgent alert: {urgent_alert['title']}")
|
||
|
||
# Wait a moment between alerts
|
||
await asyncio.sleep(2)
|
||
|
||
# Test Alert - High Priority
|
||
high_alert = {
|
||
'id': str(uuid.uuid4()),
|
||
'tenant_id': tenant_id,
|
||
'item_type': 'alert',
|
||
'type': 'test_alert_high',
|
||
'severity': 'high',
|
||
'service': 'test-manual',
|
||
'title': '⚠️ TEST HIGH PRIORITY ALERT',
|
||
'message': 'This is a high priority test alert for the bakery system',
|
||
'actions': ['Review inventory', 'Contact supplier', 'Update stock levels'],
|
||
'metadata': {'test': True, 'priority': 'high', 'category': 'inventory'},
|
||
'timestamp': datetime.utcnow().isoformat()
|
||
}
|
||
|
||
await exchange.publish(
|
||
aio_pika.Message(json.dumps(high_alert).encode()),
|
||
routing_key='alert.high.test-manual'
|
||
)
|
||
print(f"✅ Sent high priority alert: {high_alert['title']}")
|
||
|
||
# Wait a moment
|
||
await asyncio.sleep(2)
|
||
|
||
# Test Recommendation
|
||
recommendation = {
|
||
'id': str(uuid.uuid4()),
|
||
'tenant_id': tenant_id,
|
||
'item_type': 'recommendation',
|
||
'type': 'optimization_suggestion',
|
||
'severity': 'medium',
|
||
'service': 'test-manual',
|
||
'title': '💡 TEST OPTIMIZATION RECOMMENDATION',
|
||
'message': 'This is a test recommendation to optimize your bakery operations',
|
||
'actions': ['Review suggestion', 'Analyze impact', 'Consider implementation'],
|
||
'metadata': {'test': True, 'type': 'optimization', 'potential_savings': '15%'},
|
||
'timestamp': datetime.utcnow().isoformat()
|
||
}
|
||
|
||
await exchange.publish(
|
||
aio_pika.Message(json.dumps(recommendation).encode()),
|
||
routing_key='recommendation.medium.test-manual'
|
||
)
|
||
print(f"✅ Sent recommendation: {recommendation['title']}")
|
||
|
||
# Wait a moment
|
||
await asyncio.sleep(2)
|
||
|
||
# Test Low Priority Alert
|
||
low_alert = {
|
||
'id': str(uuid.uuid4()),
|
||
'tenant_id': tenant_id,
|
||
'item_type': 'alert',
|
||
'type': 'test_info',
|
||
'severity': 'low',
|
||
'service': 'test-manual',
|
||
'title': 'ℹ️ TEST INFO ALERT',
|
||
'message': 'This is a low priority informational alert for testing',
|
||
'actions': ['Review when convenient'],
|
||
'metadata': {'test': True, 'info': True},
|
||
'timestamp': datetime.utcnow().isoformat()
|
||
}
|
||
|
||
await exchange.publish(
|
||
aio_pika.Message(json.dumps(low_alert).encode()),
|
||
routing_key='alert.low.test-manual'
|
||
)
|
||
print(f"✅ Sent low priority alert: {low_alert['title']}")
|
||
|
||
await connection.close()
|
||
|
||
print("\n🎉 All test alerts sent successfully!")
|
||
print("\nYou should see these alerts in your frontend:")
|
||
print(" 1. In the notification icon (header) - live counter update")
|
||
print(" 2. In the panel de control - compact expandable cards")
|
||
print(" 3. As real-time toast notifications")
|
||
print("\nNote: Urgent and high alerts will show as toast notifications")
|
||
|
||
if __name__ == "__main__":
|
||
# Run the async function
|
||
asyncio.run(send_test_alerts()) |