From c56c558618eb230004b56b980377d361480946ea Mon Sep 17 00:00:00 2001 From: Bakery Admin Date: Sat, 24 Jan 2026 19:28:29 +0100 Subject: [PATCH] Fix redis ssl issues 2 --- .../app/services/inventory_scheduler.py | 24 ++-- services/inventory/test_dedup.py | 12 +- services/orchestrator/app/main.py | 21 ++-- services/pos/app/scheduler.py | 22 ++-- .../app/services/delivery_tracking_service.py | 20 ++-- .../app/services/production_scheduler.py | 19 ++- .../app/consumers/alert_event_consumer.py | 21 ++-- services/tenant/app/api/usage_forecast.py | 16 +-- shared/leader_election/mixin.py | 24 ++-- shared/redis_utils/__init__.py | 6 +- shared/redis_utils/client.py | 111 +++++++++++++++--- 11 files changed, 174 insertions(+), 122 deletions(-) diff --git a/services/inventory/app/services/inventory_scheduler.py b/services/inventory/app/services/inventory_scheduler.py index 0445bc62..e5588681 100644 --- a/services/inventory/app/services/inventory_scheduler.py +++ b/services/inventory/app/services/inventory_scheduler.py @@ -44,6 +44,7 @@ class InventoryScheduler: # Leader election self._redis_url = redis_url self._leader_election = None + self._redis_manager = None self._redis_client = None self._scheduler_started = False @@ -58,20 +59,15 @@ class InventoryScheduler: async def _start_with_leader_election(self): """Start with Redis-based leader election for horizontal scaling""" - import ssl - import redis.asyncio as redis from shared.leader_election import LeaderElectionService + from shared.redis_utils import RedisConnectionManager try: - # Create Redis connection with proper SSL handling for self-signed certificates - connection_kwargs = {"decode_responses": False} - - # Handle SSL/TLS for rediss:// URLs (self-signed certificates) - if self._redis_url and self._redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - self._redis_client = redis.from_url(self._redis_url, **connection_kwargs) - await self._redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self._redis_manager = await RedisConnectionManager.create( + self._redis_url, decode_responses=False + ) + self._redis_client = self._redis_manager.get_client() # Create scheduler (but don't start it yet) self.scheduler = AsyncIOScheduler() @@ -180,9 +176,9 @@ class InventoryScheduler: # Stop scheduler await self._stop_scheduler() - # Close Redis - if self._redis_client: - await self._redis_client.close() + # Close Redis manager (handles client and pool cleanup) + if hasattr(self, '_redis_manager') and self._redis_manager: + await self._redis_manager.close() logger.info("Inventory scheduler stopped") diff --git a/services/inventory/test_dedup.py b/services/inventory/test_dedup.py index 3bd1c39f..0bcd4b71 100644 --- a/services/inventory/test_dedup.py +++ b/services/inventory/test_dedup.py @@ -62,16 +62,14 @@ async def test_deduplication_in_container(): async def start(self): # Connect to Redis for deduplication testing - import ssl - connection_kwargs = {} - if self.config.REDIS_URL and self.config.REDIS_URL.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - self.redis = await aioredis.from_url(self.config.REDIS_URL, **connection_kwargs) + from shared.redis_utils import RedisConnectionManager + self._redis_manager = await RedisConnectionManager.create(self.config.REDIS_URL) + self.redis = self._redis_manager.get_client() print(f"✅ Connected to Redis for testing") async def stop(self): - if self.redis: - await self.redis.aclose() + if hasattr(self, '_redis_manager') and self._redis_manager: + await self._redis_manager.close() # Create test service service = TestInventoryAlertService() diff --git a/services/orchestrator/app/main.py b/services/orchestrator/app/main.py index abb09fe0..7745e66f 100644 --- a/services/orchestrator/app/main.py +++ b/services/orchestrator/app/main.py @@ -100,24 +100,18 @@ class OrchestratorService(StandardFastAPIService): Without leader election, each pod would run the same scheduled jobs, causing duplicate forecasts, production schedules, and database contention. """ - import ssl from shared.leader_election import LeaderElectionService - import redis.asyncio as redis + from shared.redis_utils import RedisConnectionManager try: # Create Redis connection for leader election redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}" - use_tls = settings.REDIS_TLS_ENABLED.lower() == "true" - if use_tls: + if settings.REDIS_TLS_ENABLED.lower() == "true": redis_url = redis_url.replace("redis://", "rediss://") - # Handle SSL/TLS for self-signed certificates - connection_kwargs = {"decode_responses": False} - if use_tls: - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - redis_client = redis.from_url(redis_url, **connection_kwargs) - await redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self.redis_manager = await RedisConnectionManager.create(redis_url, decode_responses=False) + redis_client = self.redis_manager.get_client() # Use shared leader election service self.leader_election = LeaderElectionService( @@ -180,6 +174,11 @@ class OrchestratorService(StandardFastAPIService): await self.scheduler_service.stop() self.logger.info("Orchestrator scheduler service stopped") + # Close Redis manager (handles client and pool cleanup) + if hasattr(self, 'redis_manager') and self.redis_manager: + await self.redis_manager.close() + self.logger.info("Redis connection closed") + def get_service_features(self): """Return orchestrator-specific features""" diff --git a/services/pos/app/scheduler.py b/services/pos/app/scheduler.py index f795172d..9ffb8f49 100644 --- a/services/pos/app/scheduler.py +++ b/services/pos/app/scheduler.py @@ -52,6 +52,7 @@ class POSScheduler: # Leader election self._redis_url = redis_url self._leader_election = None + self._redis_manager = None self._redis_client = None self._scheduler_started = False @@ -66,18 +67,15 @@ class POSScheduler: async def _start_with_leader_election(self): """Start with Redis-based leader election for horizontal scaling""" - import ssl - import redis.asyncio as redis from shared.leader_election import LeaderElectionService + from shared.redis_utils import RedisConnectionManager try: - # Create Redis connection with proper SSL handling for self-signed certificates - connection_kwargs = {"decode_responses": False} - if self._redis_url and self._redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - self._redis_client = redis.from_url(self._redis_url, **connection_kwargs) - await self._redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self._redis_manager = await RedisConnectionManager.create( + self._redis_url, decode_responses=False + ) + self._redis_client = self._redis_manager.get_client() # Create scheduler (but don't start it yet) self.scheduler = AsyncIOScheduler() @@ -201,9 +199,9 @@ class POSScheduler: # Stop scheduler await self._stop_scheduler() - # Close Redis - if self._redis_client: - await self._redis_client.close() + # Close Redis manager (handles client and pool cleanup) + if self._redis_manager: + await self._redis_manager.close() logger.info("POS scheduler stopped") diff --git a/services/procurement/app/services/delivery_tracking_service.py b/services/procurement/app/services/delivery_tracking_service.py index 34fc7d5d..d745da81 100644 --- a/services/procurement/app/services/delivery_tracking_service.py +++ b/services/procurement/app/services/delivery_tracking_service.py @@ -39,6 +39,7 @@ class DeliveryTrackingService: self.database_manager = database_manager self.scheduler = AsyncIOScheduler() self._leader_election = None + self._redis_manager = None self._redis_client = None self._scheduler_started = False self.instance_id = str(uuid4())[:8] # Short instance ID for logging @@ -56,9 +57,8 @@ class DeliveryTrackingService: async def _setup_leader_election(self): """Setup Redis-based leader election for horizontal scaling""" - import ssl from shared.leader_election import LeaderElectionService - import redis.asyncio as redis + from shared.redis_utils import RedisConnectionManager # Build Redis URL from config redis_url = getattr(self.config, 'REDIS_URL', None) @@ -69,13 +69,9 @@ class DeliveryTrackingService: redis_db = getattr(self.config, 'REDIS_DB', 0) redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" - # Handle SSL/TLS for self-signed certificates - connection_kwargs = {"decode_responses": False} - if redis_url and redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - self._redis_client = redis.from_url(redis_url, **connection_kwargs) - await self._redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self._redis_manager = await RedisConnectionManager.create(redis_url, decode_responses=False) + self._redis_client = self._redis_manager.get_client() # Create leader election service self._leader_election = LeaderElectionService( @@ -151,9 +147,9 @@ class DeliveryTrackingService: # Stop scheduler await self._stop_scheduler() - # Close Redis - if self._redis_client: - await self._redis_client.close() + # Close Redis manager (handles client and pool cleanup) + if self._redis_manager: + await self._redis_manager.close() @property def is_leader(self) -> bool: diff --git a/services/production/app/services/production_scheduler.py b/services/production/app/services/production_scheduler.py index d30b2702..dbc48820 100644 --- a/services/production/app/services/production_scheduler.py +++ b/services/production/app/services/production_scheduler.py @@ -40,6 +40,7 @@ class ProductionScheduler: # Leader election self._leader_election = None + self._redis_manager = None self._redis_client = None self._scheduler_started = False @@ -65,17 +66,12 @@ class ProductionScheduler: async def _setup_leader_election(self): """Setup Redis-based leader election""" - import ssl from shared.leader_election import LeaderElectionService - import redis.asyncio as redis + from shared.redis_utils import RedisConnectionManager - # Handle SSL/TLS for self-signed certificates - connection_kwargs = {"decode_responses": False} - if self.redis_url and self.redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - self._redis_client = redis.from_url(self.redis_url, **connection_kwargs) - await self._redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self._redis_manager = await RedisConnectionManager.create(self.redis_url, decode_responses=False) + self._redis_client = self._redis_manager.get_client() self._leader_election = LeaderElectionService( self._redis_client, @@ -137,8 +133,9 @@ class ProductionScheduler: await self._stop_scheduler() - if self._redis_client: - await self._redis_client.close() + # Close Redis manager (handles client and pool cleanup) + if self._redis_manager: + await self._redis_manager.close() @property def is_leader(self) -> bool: diff --git a/services/suppliers/app/consumers/alert_event_consumer.py b/services/suppliers/app/consumers/alert_event_consumer.py index 6ca90eed..04c8921c 100644 --- a/services/suppliers/app/consumers/alert_event_consumer.py +++ b/services/suppliers/app/consumers/alert_event_consumer.py @@ -365,21 +365,14 @@ class AlertEventConsumer: # Redis-based rate limiting implementation try: - import ssl - import redis.asyncio as redis from datetime import datetime, timedelta from app.core.config import Settings + from shared.redis_utils import RedisConnectionManager - # Connect to Redis using proper configuration with TLS and auth + # Connect to Redis using shared manager (handles SSL, pooling, health checks) settings = Settings() - redis_url = settings.REDIS_URL - - # Handle SSL/TLS for self-signed certificates - connection_kwargs = {"decode_responses": True} - if redis_url and redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - redis_client = await redis.from_url(redis_url, **connection_kwargs) + redis_manager = await RedisConnectionManager.create(settings.REDIS_URL, decode_responses=True) + redis_client = redis_manager.get_client() # Rate limit keys hour_key = f"alert_rate_limit:{tenant_id}:{alert_type}:hour:{datetime.utcnow().strftime('%Y%m%d%H')}" @@ -404,7 +397,7 @@ class AlertEventConsumer: count=hour_count, limit=max_per_hour ) - await redis_client.close() + await redis_manager.close() return False if day_count >= max_per_day: @@ -415,7 +408,7 @@ class AlertEventConsumer: count=day_count, limit=max_per_day ) - await redis_client.close() + await redis_manager.close() return False # Increment counters @@ -426,7 +419,7 @@ class AlertEventConsumer: pipe.expire(day_key, 86400) # 24 hour TTL await pipe.execute() - await redis_client.close() + await redis_manager.close() logger.debug( "Rate limit check passed", diff --git a/services/tenant/app/api/usage_forecast.py b/services/tenant/app/api/usage_forecast.py index b0f216b7..709ff78e 100644 --- a/services/tenant/app/api/usage_forecast.py +++ b/services/tenant/app/api/usage_forecast.py @@ -49,17 +49,13 @@ class UsageForecastResponse(BaseModel): async def get_redis_client() -> redis.Redis: """Get Redis client for usage tracking""" - import ssl + from shared.redis_utils import RedisConnectionManager - # Handle SSL/TLS for self-signed certificates - connection_kwargs = { - "encoding": "utf-8", - "decode_responses": True - } - if settings.REDIS_URL and settings.REDIS_URL.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - return redis.from_url(settings.REDIS_URL, **connection_kwargs) + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + manager = await RedisConnectionManager.create( + settings.REDIS_URL, decode_responses=True + ) + return manager.get_client() async def get_usage_history( diff --git a/shared/leader_election/mixin.py b/shared/leader_election/mixin.py index 639a0665..7813b18f 100644 --- a/shared/leader_election/mixin.py +++ b/shared/leader_election/mixin.py @@ -53,6 +53,7 @@ class SchedulerLeaderMixin: self._redis_url = redis_url self._service_name = service_name self._leader_election = None + self._redis_manager = None self._redis_client = None self.scheduler = None self._scheduler_started = False @@ -63,21 +64,16 @@ class SchedulerLeaderMixin: Only the leader will start the scheduler. """ - import ssl from apscheduler.schedulers.asyncio import AsyncIOScheduler from shared.leader_election.service import LeaderElectionService - import redis.asyncio as redis + from shared.redis_utils import RedisConnectionManager try: - # Create Redis connection with proper SSL handling for self-signed certificates - connection_kwargs = {"decode_responses": False} - - # Handle SSL/TLS for rediss:// URLs (self-signed certificates) - if self._redis_url and self._redis_url.startswith("rediss://"): - connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE - - self._redis_client = redis.from_url(self._redis_url, **connection_kwargs) - await self._redis_client.ping() + # Create Redis connection using shared manager (handles SSL, pooling, health checks) + self._redis_manager = await RedisConnectionManager.create( + self._redis_url, decode_responses=False + ) + self._redis_client = self._redis_manager.get_client() # Create scheduler (but don't start it yet) self.scheduler = AsyncIOScheduler() @@ -197,9 +193,9 @@ class SchedulerLeaderMixin: # Stop scheduler await self._stop_scheduler() - # Close Redis - if self._redis_client: - await self._redis_client.close() + # Close Redis manager (handles client and pool cleanup) + if self._redis_manager: + await self._redis_manager.close() logger.info("Scheduler service stopped", service=self._service_name) diff --git a/shared/redis_utils/__init__.py b/shared/redis_utils/__init__.py index c10dda1f..2b447ab1 100755 --- a/shared/redis_utils/__init__.py +++ b/shared/redis_utils/__init__.py @@ -13,17 +13,19 @@ from shared.redis_utils.client import ( set_with_ttl, get_value, increment_counter, - get_keys_pattern + get_keys_pattern, + get_ssl_kwargs_for_url, ) __all__ = [ - # Connection management + # Connection management (RedisConnectionManager.create() is recommended) "RedisConnectionManager", "get_redis_manager", "initialize_redis", "get_redis_client", "close_redis", "redis_context", + "get_ssl_kwargs_for_url", # Convenience functions "set_with_ttl", diff --git a/shared/redis_utils/client.py b/shared/redis_utils/client.py index 6feb09e9..1ec744c8 100755 --- a/shared/redis_utils/client.py +++ b/shared/redis_utils/client.py @@ -3,25 +3,111 @@ Redis client initialization and connection management Provides standardized Redis connection for all services """ +import ssl import redis.asyncio as redis -from typing import Optional +from typing import Optional, Dict, Any import structlog from contextlib import asynccontextmanager logger = structlog.get_logger() +def get_ssl_kwargs_for_url(redis_url: str) -> Dict[str, Any]: + """ + Get SSL kwargs for redis.from_url() based on the URL scheme. + + Handles self-signed certificates by disabling certificate verification + when using rediss:// (TLS-enabled) URLs. + + Args: + redis_url: Redis connection URL (redis:// or rediss://) + + Returns: + Dict with SSL configuration kwargs + """ + if redis_url and redis_url.startswith("rediss://"): + return { + "ssl_cert_reqs": ssl.CERT_NONE, # Disable certificate verification + "ssl_ca_certs": None, # Don't require CA certificates + "ssl_certfile": None, # Don't require client cert + "ssl_keyfile": None, # Don't require client key + } + return {} + + class RedisConnectionManager: """ - Manages Redis connections with connection pooling and error handling - Thread-safe singleton pattern for sharing connections across service + Manages Redis connections with connection pooling and error handling. + Thread-safe singleton pattern for sharing connections across service. + + Usage: + # Option 1: Using class method (recommended for new code) + manager = await RedisConnectionManager.create(redis_url) + client = manager.get_client() + + # Option 2: Using instance method + manager = RedisConnectionManager() + await manager.initialize(redis_url) + client = manager.get_client() + + # Don't forget to close when done + await manager.close() """ def __init__(self): self._client: Optional[redis.Redis] = None self._pool: Optional[redis.ConnectionPool] = None + self._redis_url: Optional[str] = None self.logger = logger + @classmethod + async def create( + cls, + redis_url: str, + db: int = 0, + max_connections: int = 50, + decode_responses: bool = False, + retry_on_timeout: bool = True, + socket_keepalive: bool = True, + health_check_interval: int = 30 + ) -> "RedisConnectionManager": + """ + Factory method to create and initialize a RedisConnectionManager. + + This is the recommended way to create Redis connections across all services. + Handles SSL/TLS configuration automatically for self-signed certificates. + + Args: + redis_url: Redis connection URL (redis:// or rediss://) + db: Database number (0-15) + max_connections: Maximum connections in pool + decode_responses: Automatically decode responses to strings + retry_on_timeout: Retry on timeout errors + socket_keepalive: Enable TCP keepalive + health_check_interval: Health check interval in seconds + + Returns: + Initialized RedisConnectionManager + + Example: + from shared.redis_utils import RedisConnectionManager + + manager = await RedisConnectionManager.create(settings.REDIS_URL) + client = manager.get_client() + await client.ping() + """ + instance = cls() + await instance.initialize( + redis_url=redis_url, + db=db, + max_connections=max_connections, + decode_responses=decode_responses, + retry_on_timeout=retry_on_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + ) + return instance + async def initialize( self, redis_url: str, @@ -45,8 +131,9 @@ class RedisConnectionManager: health_check_interval: Health check interval in seconds """ try: - # Create connection pool - # Handle SSL parameters for self-signed certificates + self._redis_url = redis_url + + # Create connection pool with SSL handling for self-signed certificates connection_kwargs = { 'db': db, 'max_connections': max_connections, @@ -55,16 +142,10 @@ class RedisConnectionManager: 'socket_keepalive': socket_keepalive, 'health_check_interval': health_check_interval } - - # If using SSL/TLS, add SSL parameters to handle self-signed certificates - if redis_url.startswith('rediss://'): - connection_kwargs.update({ - 'ssl_cert_reqs': None, # Disable certificate verification - 'ssl_ca_certs': None, # Don't require CA certificates - 'ssl_certfile': None, # Don't require client cert - 'ssl_keyfile': None # Don't require client key - }) - + + # Add SSL kwargs for self-signed certificates (using shared helper) + connection_kwargs.update(get_ssl_kwargs_for_url(redis_url)) + self._pool = redis.ConnectionPool.from_url( redis_url, **connection_kwargs