Fix redis ssl issues 2
This commit is contained in:
@@ -44,6 +44,7 @@ class InventoryScheduler:
|
|||||||
# Leader election
|
# Leader election
|
||||||
self._redis_url = redis_url
|
self._redis_url = redis_url
|
||||||
self._leader_election = None
|
self._leader_election = None
|
||||||
|
self._redis_manager = None
|
||||||
self._redis_client = None
|
self._redis_client = None
|
||||||
self._scheduler_started = False
|
self._scheduler_started = False
|
||||||
|
|
||||||
@@ -58,20 +59,15 @@ class InventoryScheduler:
|
|||||||
|
|
||||||
async def _start_with_leader_election(self):
|
async def _start_with_leader_election(self):
|
||||||
"""Start with Redis-based leader election for horizontal scaling"""
|
"""Start with Redis-based leader election for horizontal scaling"""
|
||||||
import ssl
|
|
||||||
import redis.asyncio as redis
|
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection with proper SSL handling for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self._redis_manager = await RedisConnectionManager.create(
|
||||||
|
self._redis_url, decode_responses=False
|
||||||
# Handle SSL/TLS for rediss:// URLs (self-signed certificates)
|
)
|
||||||
if self._redis_url and self._redis_url.startswith("rediss://"):
|
self._redis_client = self._redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
|
||||||
await self._redis_client.ping()
|
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
self.scheduler = AsyncIOScheduler()
|
self.scheduler = AsyncIOScheduler()
|
||||||
@@ -180,9 +176,9 @@ class InventoryScheduler:
|
|||||||
# Stop scheduler
|
# Stop scheduler
|
||||||
await self._stop_scheduler()
|
await self._stop_scheduler()
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
if self._redis_client:
|
if hasattr(self, '_redis_manager') and self._redis_manager:
|
||||||
await self._redis_client.close()
|
await self._redis_manager.close()
|
||||||
|
|
||||||
logger.info("Inventory scheduler stopped")
|
logger.info("Inventory scheduler stopped")
|
||||||
|
|
||||||
|
|||||||
@@ -62,16 +62,14 @@ async def test_deduplication_in_container():
|
|||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
# Connect to Redis for deduplication testing
|
# Connect to Redis for deduplication testing
|
||||||
import ssl
|
from shared.redis_utils import RedisConnectionManager
|
||||||
connection_kwargs = {}
|
self._redis_manager = await RedisConnectionManager.create(self.config.REDIS_URL)
|
||||||
if self.config.REDIS_URL and self.config.REDIS_URL.startswith("rediss://"):
|
self.redis = self._redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
self.redis = await aioredis.from_url(self.config.REDIS_URL, **connection_kwargs)
|
|
||||||
print(f"✅ Connected to Redis for testing")
|
print(f"✅ Connected to Redis for testing")
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.redis:
|
if hasattr(self, '_redis_manager') and self._redis_manager:
|
||||||
await self.redis.aclose()
|
await self._redis_manager.close()
|
||||||
|
|
||||||
# Create test service
|
# Create test service
|
||||||
service = TestInventoryAlertService()
|
service = TestInventoryAlertService()
|
||||||
|
|||||||
@@ -100,24 +100,18 @@ class OrchestratorService(StandardFastAPIService):
|
|||||||
Without leader election, each pod would run the same scheduled jobs,
|
Without leader election, each pod would run the same scheduled jobs,
|
||||||
causing duplicate forecasts, production schedules, and database contention.
|
causing duplicate forecasts, production schedules, and database contention.
|
||||||
"""
|
"""
|
||||||
import ssl
|
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection for leader election
|
# Create Redis connection for leader election
|
||||||
redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
|
redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
|
||||||
use_tls = settings.REDIS_TLS_ENABLED.lower() == "true"
|
if settings.REDIS_TLS_ENABLED.lower() == "true":
|
||||||
if use_tls:
|
|
||||||
redis_url = redis_url.replace("redis://", "rediss://")
|
redis_url = redis_url.replace("redis://", "rediss://")
|
||||||
|
|
||||||
# Handle SSL/TLS for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self.redis_manager = await RedisConnectionManager.create(redis_url, decode_responses=False)
|
||||||
if use_tls:
|
redis_client = self.redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
redis_client = redis.from_url(redis_url, **connection_kwargs)
|
|
||||||
await redis_client.ping()
|
|
||||||
|
|
||||||
# Use shared leader election service
|
# Use shared leader election service
|
||||||
self.leader_election = LeaderElectionService(
|
self.leader_election = LeaderElectionService(
|
||||||
@@ -180,6 +174,11 @@ class OrchestratorService(StandardFastAPIService):
|
|||||||
await self.scheduler_service.stop()
|
await self.scheduler_service.stop()
|
||||||
self.logger.info("Orchestrator scheduler service stopped")
|
self.logger.info("Orchestrator scheduler service stopped")
|
||||||
|
|
||||||
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
|
if hasattr(self, 'redis_manager') and self.redis_manager:
|
||||||
|
await self.redis_manager.close()
|
||||||
|
self.logger.info("Redis connection closed")
|
||||||
|
|
||||||
|
|
||||||
def get_service_features(self):
|
def get_service_features(self):
|
||||||
"""Return orchestrator-specific features"""
|
"""Return orchestrator-specific features"""
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ class POSScheduler:
|
|||||||
# Leader election
|
# Leader election
|
||||||
self._redis_url = redis_url
|
self._redis_url = redis_url
|
||||||
self._leader_election = None
|
self._leader_election = None
|
||||||
|
self._redis_manager = None
|
||||||
self._redis_client = None
|
self._redis_client = None
|
||||||
self._scheduler_started = False
|
self._scheduler_started = False
|
||||||
|
|
||||||
@@ -66,18 +67,15 @@ class POSScheduler:
|
|||||||
|
|
||||||
async def _start_with_leader_election(self):
|
async def _start_with_leader_election(self):
|
||||||
"""Start with Redis-based leader election for horizontal scaling"""
|
"""Start with Redis-based leader election for horizontal scaling"""
|
||||||
import ssl
|
|
||||||
import redis.asyncio as redis
|
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection with proper SSL handling for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self._redis_manager = await RedisConnectionManager.create(
|
||||||
if self._redis_url and self._redis_url.startswith("rediss://"):
|
self._redis_url, decode_responses=False
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
)
|
||||||
|
self._redis_client = self._redis_manager.get_client()
|
||||||
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
|
||||||
await self._redis_client.ping()
|
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
self.scheduler = AsyncIOScheduler()
|
self.scheduler = AsyncIOScheduler()
|
||||||
@@ -201,9 +199,9 @@ class POSScheduler:
|
|||||||
# Stop scheduler
|
# Stop scheduler
|
||||||
await self._stop_scheduler()
|
await self._stop_scheduler()
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
if self._redis_client:
|
if self._redis_manager:
|
||||||
await self._redis_client.close()
|
await self._redis_manager.close()
|
||||||
|
|
||||||
logger.info("POS scheduler stopped")
|
logger.info("POS scheduler stopped")
|
||||||
|
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ class DeliveryTrackingService:
|
|||||||
self.database_manager = database_manager
|
self.database_manager = database_manager
|
||||||
self.scheduler = AsyncIOScheduler()
|
self.scheduler = AsyncIOScheduler()
|
||||||
self._leader_election = None
|
self._leader_election = None
|
||||||
|
self._redis_manager = None
|
||||||
self._redis_client = None
|
self._redis_client = None
|
||||||
self._scheduler_started = False
|
self._scheduler_started = False
|
||||||
self.instance_id = str(uuid4())[:8] # Short instance ID for logging
|
self.instance_id = str(uuid4())[:8] # Short instance ID for logging
|
||||||
@@ -56,9 +57,8 @@ class DeliveryTrackingService:
|
|||||||
|
|
||||||
async def _setup_leader_election(self):
|
async def _setup_leader_election(self):
|
||||||
"""Setup Redis-based leader election for horizontal scaling"""
|
"""Setup Redis-based leader election for horizontal scaling"""
|
||||||
import ssl
|
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
# Build Redis URL from config
|
# Build Redis URL from config
|
||||||
redis_url = getattr(self.config, 'REDIS_URL', None)
|
redis_url = getattr(self.config, 'REDIS_URL', None)
|
||||||
@@ -69,13 +69,9 @@ class DeliveryTrackingService:
|
|||||||
redis_db = getattr(self.config, 'REDIS_DB', 0)
|
redis_db = getattr(self.config, 'REDIS_DB', 0)
|
||||||
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
|
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
|
||||||
|
|
||||||
# Handle SSL/TLS for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self._redis_manager = await RedisConnectionManager.create(redis_url, decode_responses=False)
|
||||||
if redis_url and redis_url.startswith("rediss://"):
|
self._redis_client = self._redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
self._redis_client = redis.from_url(redis_url, **connection_kwargs)
|
|
||||||
await self._redis_client.ping()
|
|
||||||
|
|
||||||
# Create leader election service
|
# Create leader election service
|
||||||
self._leader_election = LeaderElectionService(
|
self._leader_election = LeaderElectionService(
|
||||||
@@ -151,9 +147,9 @@ class DeliveryTrackingService:
|
|||||||
# Stop scheduler
|
# Stop scheduler
|
||||||
await self._stop_scheduler()
|
await self._stop_scheduler()
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
if self._redis_client:
|
if self._redis_manager:
|
||||||
await self._redis_client.close()
|
await self._redis_manager.close()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_leader(self) -> bool:
|
def is_leader(self) -> bool:
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ class ProductionScheduler:
|
|||||||
|
|
||||||
# Leader election
|
# Leader election
|
||||||
self._leader_election = None
|
self._leader_election = None
|
||||||
|
self._redis_manager = None
|
||||||
self._redis_client = None
|
self._redis_client = None
|
||||||
self._scheduler_started = False
|
self._scheduler_started = False
|
||||||
|
|
||||||
@@ -65,17 +66,12 @@ class ProductionScheduler:
|
|||||||
|
|
||||||
async def _setup_leader_election(self):
|
async def _setup_leader_election(self):
|
||||||
"""Setup Redis-based leader election"""
|
"""Setup Redis-based leader election"""
|
||||||
import ssl
|
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
# Handle SSL/TLS for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self._redis_manager = await RedisConnectionManager.create(self.redis_url, decode_responses=False)
|
||||||
if self.redis_url and self.redis_url.startswith("rediss://"):
|
self._redis_client = self._redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
self._redis_client = redis.from_url(self.redis_url, **connection_kwargs)
|
|
||||||
await self._redis_client.ping()
|
|
||||||
|
|
||||||
self._leader_election = LeaderElectionService(
|
self._leader_election = LeaderElectionService(
|
||||||
self._redis_client,
|
self._redis_client,
|
||||||
@@ -137,8 +133,9 @@ class ProductionScheduler:
|
|||||||
|
|
||||||
await self._stop_scheduler()
|
await self._stop_scheduler()
|
||||||
|
|
||||||
if self._redis_client:
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
await self._redis_client.close()
|
if self._redis_manager:
|
||||||
|
await self._redis_manager.close()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_leader(self) -> bool:
|
def is_leader(self) -> bool:
|
||||||
|
|||||||
@@ -365,21 +365,14 @@ class AlertEventConsumer:
|
|||||||
|
|
||||||
# Redis-based rate limiting implementation
|
# Redis-based rate limiting implementation
|
||||||
try:
|
try:
|
||||||
import ssl
|
|
||||||
import redis.asyncio as redis
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from app.core.config import Settings
|
from app.core.config import Settings
|
||||||
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
# Connect to Redis using proper configuration with TLS and auth
|
# Connect to Redis using shared manager (handles SSL, pooling, health checks)
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
redis_url = settings.REDIS_URL
|
redis_manager = await RedisConnectionManager.create(settings.REDIS_URL, decode_responses=True)
|
||||||
|
redis_client = redis_manager.get_client()
|
||||||
# Handle SSL/TLS for self-signed certificates
|
|
||||||
connection_kwargs = {"decode_responses": True}
|
|
||||||
if redis_url and redis_url.startswith("rediss://"):
|
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
redis_client = await redis.from_url(redis_url, **connection_kwargs)
|
|
||||||
|
|
||||||
# Rate limit keys
|
# Rate limit keys
|
||||||
hour_key = f"alert_rate_limit:{tenant_id}:{alert_type}:hour:{datetime.utcnow().strftime('%Y%m%d%H')}"
|
hour_key = f"alert_rate_limit:{tenant_id}:{alert_type}:hour:{datetime.utcnow().strftime('%Y%m%d%H')}"
|
||||||
@@ -404,7 +397,7 @@ class AlertEventConsumer:
|
|||||||
count=hour_count,
|
count=hour_count,
|
||||||
limit=max_per_hour
|
limit=max_per_hour
|
||||||
)
|
)
|
||||||
await redis_client.close()
|
await redis_manager.close()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if day_count >= max_per_day:
|
if day_count >= max_per_day:
|
||||||
@@ -415,7 +408,7 @@ class AlertEventConsumer:
|
|||||||
count=day_count,
|
count=day_count,
|
||||||
limit=max_per_day
|
limit=max_per_day
|
||||||
)
|
)
|
||||||
await redis_client.close()
|
await redis_manager.close()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Increment counters
|
# Increment counters
|
||||||
@@ -426,7 +419,7 @@ class AlertEventConsumer:
|
|||||||
pipe.expire(day_key, 86400) # 24 hour TTL
|
pipe.expire(day_key, 86400) # 24 hour TTL
|
||||||
await pipe.execute()
|
await pipe.execute()
|
||||||
|
|
||||||
await redis_client.close()
|
await redis_manager.close()
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Rate limit check passed",
|
"Rate limit check passed",
|
||||||
|
|||||||
@@ -49,17 +49,13 @@ class UsageForecastResponse(BaseModel):
|
|||||||
|
|
||||||
async def get_redis_client() -> redis.Redis:
|
async def get_redis_client() -> redis.Redis:
|
||||||
"""Get Redis client for usage tracking"""
|
"""Get Redis client for usage tracking"""
|
||||||
import ssl
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
# Handle SSL/TLS for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {
|
manager = await RedisConnectionManager.create(
|
||||||
"encoding": "utf-8",
|
settings.REDIS_URL, decode_responses=True
|
||||||
"decode_responses": True
|
)
|
||||||
}
|
return manager.get_client()
|
||||||
if settings.REDIS_URL and settings.REDIS_URL.startswith("rediss://"):
|
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
return redis.from_url(settings.REDIS_URL, **connection_kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_usage_history(
|
async def get_usage_history(
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ class SchedulerLeaderMixin:
|
|||||||
self._redis_url = redis_url
|
self._redis_url = redis_url
|
||||||
self._service_name = service_name
|
self._service_name = service_name
|
||||||
self._leader_election = None
|
self._leader_election = None
|
||||||
|
self._redis_manager = None
|
||||||
self._redis_client = None
|
self._redis_client = None
|
||||||
self.scheduler = None
|
self.scheduler = None
|
||||||
self._scheduler_started = False
|
self._scheduler_started = False
|
||||||
@@ -63,21 +64,16 @@ class SchedulerLeaderMixin:
|
|||||||
|
|
||||||
Only the leader will start the scheduler.
|
Only the leader will start the scheduler.
|
||||||
"""
|
"""
|
||||||
import ssl
|
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from shared.leader_election.service import LeaderElectionService
|
from shared.leader_election.service import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection with proper SSL handling for self-signed certificates
|
# Create Redis connection using shared manager (handles SSL, pooling, health checks)
|
||||||
connection_kwargs = {"decode_responses": False}
|
self._redis_manager = await RedisConnectionManager.create(
|
||||||
|
self._redis_url, decode_responses=False
|
||||||
# Handle SSL/TLS for rediss:// URLs (self-signed certificates)
|
)
|
||||||
if self._redis_url and self._redis_url.startswith("rediss://"):
|
self._redis_client = self._redis_manager.get_client()
|
||||||
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
|
||||||
|
|
||||||
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
|
||||||
await self._redis_client.ping()
|
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
self.scheduler = AsyncIOScheduler()
|
self.scheduler = AsyncIOScheduler()
|
||||||
@@ -197,9 +193,9 @@ class SchedulerLeaderMixin:
|
|||||||
# Stop scheduler
|
# Stop scheduler
|
||||||
await self._stop_scheduler()
|
await self._stop_scheduler()
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis manager (handles client and pool cleanup)
|
||||||
if self._redis_client:
|
if self._redis_manager:
|
||||||
await self._redis_client.close()
|
await self._redis_manager.close()
|
||||||
|
|
||||||
logger.info("Scheduler service stopped",
|
logger.info("Scheduler service stopped",
|
||||||
service=self._service_name)
|
service=self._service_name)
|
||||||
|
|||||||
@@ -13,17 +13,19 @@ from shared.redis_utils.client import (
|
|||||||
set_with_ttl,
|
set_with_ttl,
|
||||||
get_value,
|
get_value,
|
||||||
increment_counter,
|
increment_counter,
|
||||||
get_keys_pattern
|
get_keys_pattern,
|
||||||
|
get_ssl_kwargs_for_url,
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
# Connection management
|
# Connection management (RedisConnectionManager.create() is recommended)
|
||||||
"RedisConnectionManager",
|
"RedisConnectionManager",
|
||||||
"get_redis_manager",
|
"get_redis_manager",
|
||||||
"initialize_redis",
|
"initialize_redis",
|
||||||
"get_redis_client",
|
"get_redis_client",
|
||||||
"close_redis",
|
"close_redis",
|
||||||
"redis_context",
|
"redis_context",
|
||||||
|
"get_ssl_kwargs_for_url",
|
||||||
|
|
||||||
# Convenience functions
|
# Convenience functions
|
||||||
"set_with_ttl",
|
"set_with_ttl",
|
||||||
|
|||||||
@@ -3,25 +3,111 @@ Redis client initialization and connection management
|
|||||||
Provides standardized Redis connection for all services
|
Provides standardized Redis connection for all services
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import ssl
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from typing import Optional
|
from typing import Optional, Dict, Any
|
||||||
import structlog
|
import structlog
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
logger = structlog.get_logger()
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
def get_ssl_kwargs_for_url(redis_url: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Get SSL kwargs for redis.from_url() based on the URL scheme.
|
||||||
|
|
||||||
|
Handles self-signed certificates by disabling certificate verification
|
||||||
|
when using rediss:// (TLS-enabled) URLs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
redis_url: Redis connection URL (redis:// or rediss://)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with SSL configuration kwargs
|
||||||
|
"""
|
||||||
|
if redis_url and redis_url.startswith("rediss://"):
|
||||||
|
return {
|
||||||
|
"ssl_cert_reqs": ssl.CERT_NONE, # Disable certificate verification
|
||||||
|
"ssl_ca_certs": None, # Don't require CA certificates
|
||||||
|
"ssl_certfile": None, # Don't require client cert
|
||||||
|
"ssl_keyfile": None, # Don't require client key
|
||||||
|
}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
class RedisConnectionManager:
|
class RedisConnectionManager:
|
||||||
"""
|
"""
|
||||||
Manages Redis connections with connection pooling and error handling
|
Manages Redis connections with connection pooling and error handling.
|
||||||
Thread-safe singleton pattern for sharing connections across service
|
Thread-safe singleton pattern for sharing connections across service.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# Option 1: Using class method (recommended for new code)
|
||||||
|
manager = await RedisConnectionManager.create(redis_url)
|
||||||
|
client = manager.get_client()
|
||||||
|
|
||||||
|
# Option 2: Using instance method
|
||||||
|
manager = RedisConnectionManager()
|
||||||
|
await manager.initialize(redis_url)
|
||||||
|
client = manager.get_client()
|
||||||
|
|
||||||
|
# Don't forget to close when done
|
||||||
|
await manager.close()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._client: Optional[redis.Redis] = None
|
self._client: Optional[redis.Redis] = None
|
||||||
self._pool: Optional[redis.ConnectionPool] = None
|
self._pool: Optional[redis.ConnectionPool] = None
|
||||||
|
self._redis_url: Optional[str] = None
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(
|
||||||
|
cls,
|
||||||
|
redis_url: str,
|
||||||
|
db: int = 0,
|
||||||
|
max_connections: int = 50,
|
||||||
|
decode_responses: bool = False,
|
||||||
|
retry_on_timeout: bool = True,
|
||||||
|
socket_keepalive: bool = True,
|
||||||
|
health_check_interval: int = 30
|
||||||
|
) -> "RedisConnectionManager":
|
||||||
|
"""
|
||||||
|
Factory method to create and initialize a RedisConnectionManager.
|
||||||
|
|
||||||
|
This is the recommended way to create Redis connections across all services.
|
||||||
|
Handles SSL/TLS configuration automatically for self-signed certificates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
redis_url: Redis connection URL (redis:// or rediss://)
|
||||||
|
db: Database number (0-15)
|
||||||
|
max_connections: Maximum connections in pool
|
||||||
|
decode_responses: Automatically decode responses to strings
|
||||||
|
retry_on_timeout: Retry on timeout errors
|
||||||
|
socket_keepalive: Enable TCP keepalive
|
||||||
|
health_check_interval: Health check interval in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Initialized RedisConnectionManager
|
||||||
|
|
||||||
|
Example:
|
||||||
|
from shared.redis_utils import RedisConnectionManager
|
||||||
|
|
||||||
|
manager = await RedisConnectionManager.create(settings.REDIS_URL)
|
||||||
|
client = manager.get_client()
|
||||||
|
await client.ping()
|
||||||
|
"""
|
||||||
|
instance = cls()
|
||||||
|
await instance.initialize(
|
||||||
|
redis_url=redis_url,
|
||||||
|
db=db,
|
||||||
|
max_connections=max_connections,
|
||||||
|
decode_responses=decode_responses,
|
||||||
|
retry_on_timeout=retry_on_timeout,
|
||||||
|
socket_keepalive=socket_keepalive,
|
||||||
|
health_check_interval=health_check_interval,
|
||||||
|
)
|
||||||
|
return instance
|
||||||
|
|
||||||
async def initialize(
|
async def initialize(
|
||||||
self,
|
self,
|
||||||
redis_url: str,
|
redis_url: str,
|
||||||
@@ -45,8 +131,9 @@ class RedisConnectionManager:
|
|||||||
health_check_interval: Health check interval in seconds
|
health_check_interval: Health check interval in seconds
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Create connection pool
|
self._redis_url = redis_url
|
||||||
# Handle SSL parameters for self-signed certificates
|
|
||||||
|
# Create connection pool with SSL handling for self-signed certificates
|
||||||
connection_kwargs = {
|
connection_kwargs = {
|
||||||
'db': db,
|
'db': db,
|
||||||
'max_connections': max_connections,
|
'max_connections': max_connections,
|
||||||
@@ -55,16 +142,10 @@ class RedisConnectionManager:
|
|||||||
'socket_keepalive': socket_keepalive,
|
'socket_keepalive': socket_keepalive,
|
||||||
'health_check_interval': health_check_interval
|
'health_check_interval': health_check_interval
|
||||||
}
|
}
|
||||||
|
|
||||||
# If using SSL/TLS, add SSL parameters to handle self-signed certificates
|
# Add SSL kwargs for self-signed certificates (using shared helper)
|
||||||
if redis_url.startswith('rediss://'):
|
connection_kwargs.update(get_ssl_kwargs_for_url(redis_url))
|
||||||
connection_kwargs.update({
|
|
||||||
'ssl_cert_reqs': None, # Disable certificate verification
|
|
||||||
'ssl_ca_certs': None, # Don't require CA certificates
|
|
||||||
'ssl_certfile': None, # Don't require client cert
|
|
||||||
'ssl_keyfile': None # Don't require client key
|
|
||||||
})
|
|
||||||
|
|
||||||
self._pool = redis.ConnectionPool.from_url(
|
self._pool = redis.ConnectionPool.from_url(
|
||||||
redis_url,
|
redis_url,
|
||||||
**connection_kwargs
|
**connection_kwargs
|
||||||
|
|||||||
Reference in New Issue
Block a user