Fix redis ssl issues
This commit is contained in:
@@ -58,12 +58,19 @@ class InventoryScheduler:
|
|||||||
|
|
||||||
async def _start_with_leader_election(self):
|
async def _start_with_leader_election(self):
|
||||||
"""Start with Redis-based leader election for horizontal scaling"""
|
"""Start with Redis-based leader election for horizontal scaling"""
|
||||||
|
import ssl
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection
|
# Create Redis connection with proper SSL handling for self-signed certificates
|
||||||
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
|
||||||
|
# Handle SSL/TLS for rediss:// URLs (self-signed certificates)
|
||||||
|
if self._redis_url and self._redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
||||||
await self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
|
|||||||
@@ -62,7 +62,11 @@ async def test_deduplication_in_container():
|
|||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
# Connect to Redis for deduplication testing
|
# Connect to Redis for deduplication testing
|
||||||
self.redis = await aioredis.from_url(self.config.REDIS_URL)
|
import ssl
|
||||||
|
connection_kwargs = {}
|
||||||
|
if self.config.REDIS_URL and self.config.REDIS_URL.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
self.redis = await aioredis.from_url(self.config.REDIS_URL, **connection_kwargs)
|
||||||
print(f"✅ Connected to Redis for testing")
|
print(f"✅ Connected to Redis for testing")
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
|||||||
@@ -100,16 +100,23 @@ class OrchestratorService(StandardFastAPIService):
|
|||||||
Without leader election, each pod would run the same scheduled jobs,
|
Without leader election, each pod would run the same scheduled jobs,
|
||||||
causing duplicate forecasts, production schedules, and database contention.
|
causing duplicate forecasts, production schedules, and database contention.
|
||||||
"""
|
"""
|
||||||
|
import ssl
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection for leader election
|
# Create Redis connection for leader election
|
||||||
redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
|
redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
|
||||||
if settings.REDIS_TLS_ENABLED.lower() == "true":
|
use_tls = settings.REDIS_TLS_ENABLED.lower() == "true"
|
||||||
|
if use_tls:
|
||||||
redis_url = redis_url.replace("redis://", "rediss://")
|
redis_url = redis_url.replace("redis://", "rediss://")
|
||||||
|
|
||||||
redis_client = redis.from_url(redis_url, decode_responses=False)
|
# Handle SSL/TLS for self-signed certificates
|
||||||
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
if use_tls:
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
redis_client = redis.from_url(redis_url, **connection_kwargs)
|
||||||
await redis_client.ping()
|
await redis_client.ping()
|
||||||
|
|
||||||
# Use shared leader election service
|
# Use shared leader election service
|
||||||
|
|||||||
@@ -66,12 +66,17 @@ class POSScheduler:
|
|||||||
|
|
||||||
async def _start_with_leader_election(self):
|
async def _start_with_leader_election(self):
|
||||||
"""Start with Redis-based leader election for horizontal scaling"""
|
"""Start with Redis-based leader election for horizontal scaling"""
|
||||||
|
import ssl
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection
|
# Create Redis connection with proper SSL handling for self-signed certificates
|
||||||
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
if self._redis_url and self._redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
||||||
await self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ class DeliveryTrackingService:
|
|||||||
|
|
||||||
async def _setup_leader_election(self):
|
async def _setup_leader_election(self):
|
||||||
"""Setup Redis-based leader election for horizontal scaling"""
|
"""Setup Redis-based leader election for horizontal scaling"""
|
||||||
|
import ssl
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
@@ -68,7 +69,12 @@ class DeliveryTrackingService:
|
|||||||
redis_db = getattr(self.config, 'REDIS_DB', 0)
|
redis_db = getattr(self.config, 'REDIS_DB', 0)
|
||||||
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
|
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
|
||||||
|
|
||||||
self._redis_client = redis.from_url(redis_url, decode_responses=False)
|
# Handle SSL/TLS for self-signed certificates
|
||||||
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
if redis_url and redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
self._redis_client = redis.from_url(redis_url, **connection_kwargs)
|
||||||
await self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
|
|
||||||
# Create leader election service
|
# Create leader election service
|
||||||
|
|||||||
@@ -65,10 +65,16 @@ class ProductionScheduler:
|
|||||||
|
|
||||||
async def _setup_leader_election(self):
|
async def _setup_leader_election(self):
|
||||||
"""Setup Redis-based leader election"""
|
"""Setup Redis-based leader election"""
|
||||||
|
import ssl
|
||||||
from shared.leader_election import LeaderElectionService
|
from shared.leader_election import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
self._redis_client = redis.from_url(self.redis_url, decode_responses=False)
|
# Handle SSL/TLS for self-signed certificates
|
||||||
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
if self.redis_url and self.redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
self._redis_client = redis.from_url(self.redis_url, **connection_kwargs)
|
||||||
await self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
|
|
||||||
self._leader_election = LeaderElectionService(
|
self._leader_election = LeaderElectionService(
|
||||||
|
|||||||
@@ -365,6 +365,7 @@ class AlertEventConsumer:
|
|||||||
|
|
||||||
# Redis-based rate limiting implementation
|
# Redis-based rate limiting implementation
|
||||||
try:
|
try:
|
||||||
|
import ssl
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from app.core.config import Settings
|
from app.core.config import Settings
|
||||||
@@ -372,7 +373,13 @@ class AlertEventConsumer:
|
|||||||
# Connect to Redis using proper configuration with TLS and auth
|
# Connect to Redis using proper configuration with TLS and auth
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
redis_url = settings.REDIS_URL
|
redis_url = settings.REDIS_URL
|
||||||
redis_client = await redis.from_url(redis_url, decode_responses=True)
|
|
||||||
|
# Handle SSL/TLS for self-signed certificates
|
||||||
|
connection_kwargs = {"decode_responses": True}
|
||||||
|
if redis_url and redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
redis_client = await redis.from_url(redis_url, **connection_kwargs)
|
||||||
|
|
||||||
# Rate limit keys
|
# Rate limit keys
|
||||||
hour_key = f"alert_rate_limit:{tenant_id}:{alert_type}:hour:{datetime.utcnow().strftime('%Y%m%d%H')}"
|
hour_key = f"alert_rate_limit:{tenant_id}:{alert_type}:hour:{datetime.utcnow().strftime('%Y%m%d%H')}"
|
||||||
|
|||||||
@@ -49,11 +49,17 @@ class UsageForecastResponse(BaseModel):
|
|||||||
|
|
||||||
async def get_redis_client() -> redis.Redis:
|
async def get_redis_client() -> redis.Redis:
|
||||||
"""Get Redis client for usage tracking"""
|
"""Get Redis client for usage tracking"""
|
||||||
return redis.from_url(
|
import ssl
|
||||||
settings.REDIS_URL,
|
|
||||||
encoding="utf-8",
|
# Handle SSL/TLS for self-signed certificates
|
||||||
decode_responses=True
|
connection_kwargs = {
|
||||||
)
|
"encoding": "utf-8",
|
||||||
|
"decode_responses": True
|
||||||
|
}
|
||||||
|
if settings.REDIS_URL and settings.REDIS_URL.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
return redis.from_url(settings.REDIS_URL, **connection_kwargs)
|
||||||
|
|
||||||
|
|
||||||
async def get_usage_history(
|
async def get_usage_history(
|
||||||
|
|||||||
@@ -63,13 +63,20 @@ class SchedulerLeaderMixin:
|
|||||||
|
|
||||||
Only the leader will start the scheduler.
|
Only the leader will start the scheduler.
|
||||||
"""
|
"""
|
||||||
|
import ssl
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from shared.leader_election.service import LeaderElectionService
|
from shared.leader_election.service import LeaderElectionService
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Create Redis connection
|
# Create Redis connection with proper SSL handling for self-signed certificates
|
||||||
self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
|
connection_kwargs = {"decode_responses": False}
|
||||||
|
|
||||||
|
# Handle SSL/TLS for rediss:// URLs (self-signed certificates)
|
||||||
|
if self._redis_url and self._redis_url.startswith("rediss://"):
|
||||||
|
connection_kwargs["ssl_cert_reqs"] = ssl.CERT_NONE
|
||||||
|
|
||||||
|
self._redis_client = redis.from_url(self._redis_url, **connection_kwargs)
|
||||||
await self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
|
|
||||||
# Create scheduler (but don't start it yet)
|
# Create scheduler (but don't start it yet)
|
||||||
|
|||||||
Reference in New Issue
Block a user