Files
bakery-ia/tests/test_alert_working.py
Urtzi Alfaro d1c83dce74 Clean code
2025-09-29 19:18:45 +02:00

137 lines
4.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
WORKING Alert Test Script
This script successfully sends alerts that reach the frontend via SSE.
Use this for future testing of the real-time alert system.
"""
import aio_pika
import asyncio
import json
import uuid
from datetime import datetime
async def send_test_alerts():
"""Send test alerts and recommendations to the system"""
# Connect to RabbitMQ using the correct credentials
connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/')
channel = await connection.channel()
# Declare the alerts exchange
exchange = await channel.declare_exchange(
'alerts.exchange',
aio_pika.ExchangeType.TOPIC,
durable=True
)
# Use your actual tenant ID (replace if different)
tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a'
print("🚀 Sending test alerts to the system...")
# Test Alert - Urgent
urgent_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_alert',
'severity': 'urgent', # Must be lowercase: urgent, high, medium, low
'service': 'test-manual',
'title': '🚨 TEST URGENT ALERT',
'message': 'This is a test urgent alert that should appear in your frontend immediately',
'actions': ['Check frontend dashboard', 'Verify notification icon', 'Confirm real-time reception'],
'metadata': {'test': True, 'manual': True, 'timestamp': datetime.utcnow().isoformat()},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(urgent_alert).encode()),
routing_key='alert.urgent.test-manual'
)
print(f"✅ Sent urgent alert: {urgent_alert['title']}")
# Wait a moment between alerts
await asyncio.sleep(2)
# Test Alert - High Priority
high_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_alert_high',
'severity': 'high',
'service': 'test-manual',
'title': '⚠️ TEST HIGH PRIORITY ALERT',
'message': 'This is a high priority test alert for the bakery system',
'actions': ['Review inventory', 'Contact supplier', 'Update stock levels'],
'metadata': {'test': True, 'priority': 'high', 'category': 'inventory'},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(high_alert).encode()),
routing_key='alert.high.test-manual'
)
print(f"✅ Sent high priority alert: {high_alert['title']}")
# Wait a moment
await asyncio.sleep(2)
# Test Recommendation
recommendation = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'recommendation',
'type': 'optimization_suggestion',
'severity': 'medium',
'service': 'test-manual',
'title': '💡 TEST OPTIMIZATION RECOMMENDATION',
'message': 'This is a test recommendation to optimize your bakery operations',
'actions': ['Review suggestion', 'Analyze impact', 'Consider implementation'],
'metadata': {'test': True, 'type': 'optimization', 'potential_savings': '15%'},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(recommendation).encode()),
routing_key='recommendation.medium.test-manual'
)
print(f"✅ Sent recommendation: {recommendation['title']}")
# Wait a moment
await asyncio.sleep(2)
# Test Low Priority Alert
low_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_info',
'severity': 'low',
'service': 'test-manual',
'title': ' TEST INFO ALERT',
'message': 'This is a low priority informational alert for testing',
'actions': ['Review when convenient'],
'metadata': {'test': True, 'info': True},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(low_alert).encode()),
routing_key='alert.low.test-manual'
)
print(f"✅ Sent low priority alert: {low_alert['title']}")
await connection.close()
print("\n🎉 All test alerts sent successfully!")
print("\nYou should see these alerts in your frontend:")
print(" 1. In the notification icon (header) - live counter update")
print(" 2. In the panel de control - compact expandable cards")
print(" 3. As real-time toast notifications")
print("\nNote: Urgent and high alerts will show as toast notifications")
if __name__ == "__main__":
# Run the async function
asyncio.run(send_test_alerts())