#!/usr/bin/env python3 """ Verification script to confirm the deduplication fix is working This runs inside the inventory service container to test the actual implementation """ import asyncio import redis.asyncio as aioredis import json from datetime import datetime from uuid import UUID # Mock the required components class MockConfig: SERVICE_NAME = "test-inventory-service" REDIS_URL = "redis://redis_pass123@172.20.0.10:6379/0" DATABASE_URL = "mock://test" RABBITMQ_URL = "mock://test" class MockDatabaseManager: def get_session(self): return self async def __aenter__(self): return self async def __aexit__(self, *args): pass class MockRabbitMQClient: def __init__(self, *args): self.connected = True async def connect(self): pass async def disconnect(self): pass async def publish_event(self, *args, **kwargs): print(f"๐Ÿ“ค Mock publish: Would send alert to RabbitMQ") return True async def test_deduplication_in_container(): """Test the actual deduplication logic using the fixed implementation""" print("๐Ÿงช Testing Alert Deduplication Fix") print("=" * 50) # Import the actual BaseAlertService with our fix import sys sys.path.append('/app') from shared.alerts.base_service import BaseAlertService class TestInventoryAlertService(BaseAlertService): def __init__(self): self.config = MockConfig() self.db_manager = MockDatabaseManager() self.rabbitmq_client = MockRabbitMQClient() self.redis = None self._items_published = 0 self._checks_performed = 0 self._errors_count = 0 def setup_scheduled_checks(self): pass async def start(self): # Connect to Redis for deduplication testing self.redis = await aioredis.from_url(self.config.REDIS_URL) print(f"โœ… Connected to Redis for testing") async def stop(self): if self.redis: await self.redis.aclose() # Create test service service = TestInventoryAlertService() await service.start() try: tenant_id = UUID('c464fb3e-7af2-46e6-9e43-85318f34199a') print("\\n1๏ธโƒฃ Testing Overstock Alert Deduplication") print("-" * 40) # First overstock alert overstock_alert = { 'type': 'overstock_warning', 'severity': 'medium', 'title': '๐Ÿ“ฆ Exceso de Stock: Test Croissant', 'message': 'Stock actual 150.0kg excede mรกximo 100.0kg.', 'actions': ['Revisar caducidades'], 'metadata': { 'ingredient_id': 'test-croissant-123', 'current_stock': 150.0, 'maximum_stock': 100.0 } } # Send first alert - should succeed result1 = await service.publish_item(tenant_id, overstock_alert.copy(), 'alert') print(f"First overstock alert: {'โœ… Published' if result1 else 'โŒ Blocked'}") # Send duplicate alert - should be blocked result2 = await service.publish_item(tenant_id, overstock_alert.copy(), 'alert') print(f"Duplicate overstock alert: {'โŒ Published (ERROR!)' if result2 else 'โœ… Blocked (SUCCESS!)'}") print("\\n2๏ธโƒฃ Testing Different Ingredient - Should Pass") print("-" * 40) # Different ingredient - should succeed overstock_alert2 = overstock_alert.copy() overstock_alert2['title'] = '๐Ÿ“ฆ Exceso de Stock: Test Harina' overstock_alert2['metadata'] = { 'ingredient_id': 'test-harina-456', # Different ingredient 'current_stock': 200.0, 'maximum_stock': 150.0 } result3 = await service.publish_item(tenant_id, overstock_alert2, 'alert') print(f"Different ingredient alert: {'โœ… Published' if result3 else 'โŒ Blocked (ERROR!)'}") print("\\n3๏ธโƒฃ Testing Expired Products Deduplication") print("-" * 40) # Expired products alert expired_alert = { 'type': 'expired_products', 'severity': 'urgent', 'title': '๐Ÿ—‘๏ธ Productos Caducados Test', 'message': '3 productos han caducado.', 'actions': ['Retirar inmediatamente'], 'metadata': { 'expired_items': [ {'id': 'expired-1', 'name': 'Leche', 'stock_id': 'stock-1'}, {'id': 'expired-2', 'name': 'Huevos', 'stock_id': 'stock-2'} ] } } # Send first expired products alert - should succeed result4 = await service.publish_item(tenant_id, expired_alert.copy(), 'alert') print(f"First expired products alert: {'โœ… Published' if result4 else 'โŒ Blocked'}") # Send duplicate expired products alert - should be blocked result5 = await service.publish_item(tenant_id, expired_alert.copy(), 'alert') print(f"Duplicate expired products alert: {'โŒ Published (ERROR!)' if result5 else 'โœ… Blocked (SUCCESS!)'}") print("\\n๐Ÿ“Š Test Results Summary") print("=" * 50) unique_published = sum([result1, result3, result4]) # Should be 3 duplicates_blocked = sum([not result2, not result5]) # Should be 2 print(f"โœ… Unique alerts published: {unique_published}/3") print(f"๐Ÿšซ Duplicate alerts blocked: {duplicates_blocked}/2") if unique_published == 3 and duplicates_blocked == 2: print("\\n๐ŸŽ‰ SUCCESS: Deduplication fix is working correctly!") print(" โ€ข All unique alerts were published") print(" โ€ข All duplicate alerts were blocked") print(" โ€ข The duplicate alert issue should be resolved") else: print("\\nโŒ ISSUE: Deduplication is not working as expected") # Show Redis keys for verification print("\\n๐Ÿ” Deduplication Keys in Redis:") keys = await service.redis.keys("item_sent:*") for key in keys: ttl = await service.redis.ttl(key) decoded_key = key.decode() if isinstance(key, bytes) else key print(f" โ€ข {decoded_key} (TTL: {ttl}s)") finally: await service.stop() print("\\nโœ… Test completed and cleaned up") if __name__ == "__main__": asyncio.run(test_deduplication_in_container())