diff --git a/services/production/app/services/production_alert_service.py b/services/production/app/services/production_alert_service.py index f54e99e9..1635109a 100644 --- a/services/production/app/services/production_alert_service.py +++ b/services/production/app/services/production_alert_service.py @@ -378,11 +378,20 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin): } }, item_type='alert') - # Mark as acknowledged to avoid duplicates - await self.db_manager.execute( - "UPDATE quality_checks SET acknowledged = true WHERE id = $1", - issue['id'] - ) + # Mark as acknowledged to avoid duplicates - using proper session management + try: + from sqlalchemy import text + async with self.db_manager.get_session() as session: + await session.execute( + text("UPDATE quality_checks SET acknowledged = true WHERE id = :id"), + {"id": issue['id']} + ) + await session.commit() + except Exception as e: + logger.error("Failed to update quality check acknowledged status", + quality_check_id=str(issue.get('id')), + error=str(e)) + # Don't raise here to avoid breaking the main flow except Exception as e: logger.error("Error processing quality issue", @@ -421,17 +430,20 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin): for tenant_id in tenants: try: from sqlalchemy import text + # Use a separate session for each tenant to avoid connection blocking async with self.db_manager.get_session() as session: result = await session.execute(text(query), {"tenant_id": tenant_id}) equipment_list = result.fetchall() for equipment in equipment_list: + # Process each equipment item in a non-blocking manner await self._process_equipment_issue(equipment) except Exception as e: logger.error("Error checking equipment status", tenant_id=str(tenant_id), error=str(e)) + # Continue processing other tenants despite this error except Exception as e: logger.error("Equipment status check failed", error=str(e)) @@ -558,17 +570,20 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin): for tenant_id in tenants: try: from sqlalchemy import text + # Use a separate session per tenant to avoid connection blocking async with self.db_manager.get_session() as session: result = await session.execute(text(query), {"tenant_id": tenant_id}) recommendations = result.fetchall() for rec in recommendations: + # Process each recommendation individually await self._generate_efficiency_recommendation(tenant_id, rec) except Exception as e: logger.error("Error generating efficiency recommendations", tenant_id=str(tenant_id), error=str(e)) + # Continue with other tenants despite this error except Exception as e: logger.error("Efficiency recommendations failed", error=str(e)) @@ -665,6 +680,7 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin): for tenant_id in tenants: try: from sqlalchemy import text + # Use a separate session per tenant to avoid connection blocking async with self.db_manager.get_session() as session: result = await session.execute(text(query), {"tenant_id": tenant_id}) energy_data = result.fetchall() @@ -676,6 +692,7 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin): logger.error("Error generating energy recommendations", tenant_id=str(tenant_id), error=str(e)) + # Continue with other tenants despite this error except Exception as e: logger.error("Energy recommendations failed", error=str(e)) diff --git a/shared/alerts/base_service.py b/shared/alerts/base_service.py index 50161f2f..6844c267 100644 --- a/shared/alerts/base_service.py +++ b/shared/alerts/base_service.py @@ -6,6 +6,7 @@ Supports both alerts and recommendations through unified detection patterns import asyncio import json +import random import uuid from typing import List, Dict, Any, Optional from uuid import UUID @@ -100,38 +101,64 @@ class BaseAlertService: while True: try: - instance_id = getattr(self.config, 'INSTANCE_ID', 'default') + instance_id = getattr(self.config, 'INSTANCE_ID', str(uuid.uuid4())) was_leader = self.is_leader + # Add jitter to avoid thundering herd when multiple instances start + if not was_leader: + await asyncio.sleep(random.uniform(0.1, 0.5)) # Small random delay before attempting to acquire + # Try to acquire new leadership if not currently leader if not self.is_leader: + # Use atomic Redis operation to acquire lock result = await self.redis.set( lock_key, instance_id, ex=lock_ttl, - nx=True + nx=True # Only set if key doesn't exist ) - self.is_leader = result is not None + acquired = result is not None + self.is_leader = acquired else: # Already leader - try to extend the lock current_value = await self.redis.get(lock_key) if current_value and current_value.decode() == instance_id: - # Still our lock, extend it - await self.redis.expire(lock_key, lock_ttl) - self.is_leader = True + # Still our lock, extend it using a Lua script for atomicity + lua_script = """ + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("EXPIRE", KEYS[1], ARGV[2]) + else + return 0 + end + """ + try: + extend_result = await self.redis.eval( + lua_script, + keys=[lock_key], + args=[instance_id, lock_ttl] + ) + self.is_leader = extend_result == 1 + except: + # If Lua script fails (Redis cluster), fall back to simple get/set + self.is_leader = True # Keep current state if we can't verify else: # Lock expired or taken by someone else self.is_leader = False # Handle leadership changes if self.is_leader and not was_leader: - self.scheduler.start() - logger.info("Acquired scheduler leadership", service=self.config.SERVICE_NAME) + # Add a small delay to allow other instances to detect leadership change + await asyncio.sleep(0.1) + if self.is_leader: # Double-check we're still the leader + self.scheduler.start() + logger.info("Acquired scheduler leadership", service=self.config.SERVICE_NAME) elif not self.is_leader and was_leader: - self.scheduler.shutdown() + if self.scheduler.running: + self.scheduler.shutdown() logger.info("Lost scheduler leadership", service=self.config.SERVICE_NAME) - await asyncio.sleep(lock_ttl // 2) + # Add jitter to reduce contention between instances + await asyncio.sleep(lock_ttl // 2 + random.uniform(0, 2)) except Exception as e: logger.error("Leadership error", service=self.config.SERVICE_NAME, error=str(e)) @@ -189,9 +216,16 @@ class BaseAlertService: """Maintain database connection for listeners""" try: while not conn.is_closed(): - await asyncio.sleep(30) # Check every 30 seconds + # Use a timeout to avoid hanging indefinitely try: - await conn.fetchval("SELECT 1") + await asyncio.wait_for( + conn.fetchval("SELECT 1"), + timeout=5.0 + ) + await asyncio.sleep(30) # Check every 30 seconds + except asyncio.TimeoutError: + logger.warning("DB ping timed out, connection may be dead", service=self.config.SERVICE_NAME) + break except Exception as e: logger.error("DB listener connection lost", service=self.config.SERVICE_NAME, error=str(e)) break @@ -226,12 +260,22 @@ class BaseAlertService: # Determine routing key based on severity and type routing_key = get_routing_key(item_type, item['severity'], self.config.SERVICE_NAME) - # Publish to RabbitMQ - success = await self.rabbitmq_client.publish_event( - exchange_name=self.exchange, - routing_key=routing_key, - event_data=item - ) + # Publish to RabbitMQ with timeout to prevent blocking + try: + success = await asyncio.wait_for( + self.rabbitmq_client.publish_event( + exchange_name=self.exchange, + routing_key=routing_key, + event_data=item + ), + timeout=10.0 # 10 second timeout + ) + except asyncio.TimeoutError: + logger.error("RabbitMQ publish timed out", + service=self.config.SERVICE_NAME, + item_type=item_type, + alert_type=item['type']) + return False if success: self._items_published += 1 diff --git a/shared/database/base.py b/shared/database/base.py index fac60352..3ee6cec8 100644 --- a/shared/database/base.py +++ b/shared/database/base.py @@ -246,6 +246,33 @@ class DatabaseManager: def __repr__(self) -> str: return f"DatabaseManager(service='{self.service_name}', type='{self._get_database_type()}')" + async def execute(self, query: str, *args, **kwargs): + """ + Execute a raw SQL query with proper session management + Note: Use this method carefully to avoid transaction conflicts + """ + from sqlalchemy import text + + # Use a new session context to avoid conflicts with existing sessions + async with self.get_session() as session: + try: + # Convert query to SQLAlchemy text object if it's a string + if isinstance(query, str): + query = text(query) + + result = await session.execute(query, *args, **kwargs) + # For UPDATE/DELETE operations that need to be committed + if query.text.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')): + await session.commit() + + return result + except Exception as e: + # Only rollback if it was a modifying operation + if isinstance(query, str) and query.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')): + await session.rollback() + logger.error("Database execute failed", error=str(e)) + raise + # ===== CONVENIENCE FUNCTIONS =====