Clean code

This commit is contained in:
Urtzi Alfaro
2025-09-29 19:18:45 +02:00
parent 2712a60a2a
commit d1c83dce74
6 changed files with 0 additions and 308 deletions

137
tests/test_alert_working.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
WORKING Alert Test Script
This script successfully sends alerts that reach the frontend via SSE.
Use this for future testing of the real-time alert system.
"""
import aio_pika
import asyncio
import json
import uuid
from datetime import datetime
async def send_test_alerts():
"""Send test alerts and recommendations to the system"""
# Connect to RabbitMQ using the correct credentials
connection = await aio_pika.connect_robust('amqp://bakery:forecast123@rabbitmq:5672/')
channel = await connection.channel()
# Declare the alerts exchange
exchange = await channel.declare_exchange(
'alerts.exchange',
aio_pika.ExchangeType.TOPIC,
durable=True
)
# Use your actual tenant ID (replace if different)
tenant_id = 'c464fb3e-7af2-46e6-9e43-85318f34199a'
print("🚀 Sending test alerts to the system...")
# Test Alert - Urgent
urgent_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_alert',
'severity': 'urgent', # Must be lowercase: urgent, high, medium, low
'service': 'test-manual',
'title': '🚨 TEST URGENT ALERT',
'message': 'This is a test urgent alert that should appear in your frontend immediately',
'actions': ['Check frontend dashboard', 'Verify notification icon', 'Confirm real-time reception'],
'metadata': {'test': True, 'manual': True, 'timestamp': datetime.utcnow().isoformat()},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(urgent_alert).encode()),
routing_key='alert.urgent.test-manual'
)
print(f"✅ Sent urgent alert: {urgent_alert['title']}")
# Wait a moment between alerts
await asyncio.sleep(2)
# Test Alert - High Priority
high_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_alert_high',
'severity': 'high',
'service': 'test-manual',
'title': '⚠️ TEST HIGH PRIORITY ALERT',
'message': 'This is a high priority test alert for the bakery system',
'actions': ['Review inventory', 'Contact supplier', 'Update stock levels'],
'metadata': {'test': True, 'priority': 'high', 'category': 'inventory'},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(high_alert).encode()),
routing_key='alert.high.test-manual'
)
print(f"✅ Sent high priority alert: {high_alert['title']}")
# Wait a moment
await asyncio.sleep(2)
# Test Recommendation
recommendation = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'recommendation',
'type': 'optimization_suggestion',
'severity': 'medium',
'service': 'test-manual',
'title': '💡 TEST OPTIMIZATION RECOMMENDATION',
'message': 'This is a test recommendation to optimize your bakery operations',
'actions': ['Review suggestion', 'Analyze impact', 'Consider implementation'],
'metadata': {'test': True, 'type': 'optimization', 'potential_savings': '15%'},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(recommendation).encode()),
routing_key='recommendation.medium.test-manual'
)
print(f"✅ Sent recommendation: {recommendation['title']}")
# Wait a moment
await asyncio.sleep(2)
# Test Low Priority Alert
low_alert = {
'id': str(uuid.uuid4()),
'tenant_id': tenant_id,
'item_type': 'alert',
'type': 'test_info',
'severity': 'low',
'service': 'test-manual',
'title': ' TEST INFO ALERT',
'message': 'This is a low priority informational alert for testing',
'actions': ['Review when convenient'],
'metadata': {'test': True, 'info': True},
'timestamp': datetime.utcnow().isoformat()
}
await exchange.publish(
aio_pika.Message(json.dumps(low_alert).encode()),
routing_key='alert.low.test-manual'
)
print(f"✅ Sent low priority alert: {low_alert['title']}")
await connection.close()
print("\n🎉 All test alerts sent successfully!")
print("\nYou should see these alerts in your frontend:")
print(" 1. In the notification icon (header) - live counter update")
print(" 2. In the panel de control - compact expandable cards")
print(" 3. As real-time toast notifications")
print("\nNote: Urgent and high alerts will show as toast notifications")
if __name__ == "__main__":
# Run the async function
asyncio.run(send_test_alerts())