Fix critical bugs and standardize service integrations

Critical Fixes:
- Orchestrator: Add missing OrchestrationStatus import (fixes HTTP 500 during demo clone)
- Procurement: Migrate from custom cache utils to shared Redis utils
- Suppliers: Use proper Settings for Redis configuration with TLS/auth
- Recipes/Suppliers clients: Fix endpoint paths (remove duplicate path segments)
- Procurement client: Use suppliers service directly for supplier details

Details:
1. services/orchestrator/app/api/internal_demo.py:
   - Added OrchestrationStatus import to fix cloning error
   - This was causing HTTP 500 errors during demo session cloning

2. services/procurement/app/api/purchase_orders.py + service:
   - Replaced app.utils.cache with shared.redis_utils
   - Standardizes caching across all services
   - Removed custom cache utilities (deleted app/utils/cache.py)

3. services/suppliers/app/consumers/alert_event_consumer.py:
   - Use Settings().REDIS_URL instead of os.getenv
   - Ensures proper Redis connection with TLS and authentication

4. shared/clients/recipes_client.py:
   - Fixed endpoint paths: recipes/recipes/{id} → recipes/{id}
   - Applied to all recipe methods (by_id, by_products, instructions, yield)

5. shared/clients/suppliers_client.py:
   - Fixed endpoint path: suppliers/suppliers/{id} → suppliers/{id}

6. shared/clients/procurement_client.py:
   - get_supplier_by_id now uses SuppliersServiceClient directly
   - Removes incorrect call to procurement service for supplier details

Impact:
- Demo session cloning now works without orchestrator errors 
- Consistent Redis usage across all services
- Correct service boundaries (suppliers data from suppliers service)
- Clean client endpoint paths

🤖 Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Urtzi Alfaro
2025-12-16 11:33:22 +01:00
parent 9f3b39bd28
commit c68d82ca7f
9 changed files with 48 additions and 319 deletions

View File

@@ -13,7 +13,7 @@ import json
from app.core.database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func
from app.models.orchestration_run import OrchestrationRun
from app.models.orchestration_run import OrchestrationRun, OrchestrationStatus
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional

View File

@@ -27,7 +27,7 @@ from app.schemas.purchase_order_schemas import (
)
from shared.routing import RouteBuilder
from shared.auth.decorators import get_current_user_dep
from app.utils.cache import get_cached, set_cached, make_cache_key
from shared.redis_utils import get_value, set_with_ttl
import structlog
logger = structlog.get_logger()
@@ -146,17 +146,14 @@ async def list_purchase_orders(
# Only skip cache for supplier_id filter and pagination (skip > 0)
cache_key = None
if skip == 0 and supplier_id is None:
cache_key = make_cache_key(
"purchase_orders",
tenant_id,
limit=limit,
status=status, # Include status in cache key
enrich_supplier=enrich_supplier
)
cached_result = await get_cached(cache_key)
if cached_result is not None:
logger.debug("Cache hit for purchase orders", cache_key=cache_key, tenant_id=tenant_id, status=status)
return [PurchaseOrderResponse(**po) for po in cached_result]
cache_key = f"purchase_orders:{tenant_id}:limit:{limit}:status:{status}:enrich:{enrich_supplier}"
try:
cached_result = await get_value(cache_key)
if cached_result is not None:
logger.debug("Cache hit for purchase orders", cache_key=cache_key, tenant_id=tenant_id, status=status)
return [PurchaseOrderResponse(**po) for po in cached_result]
except Exception as e:
logger.warning("Cache read failed, continuing without cache", cache_key=cache_key, error=str(e))
# Cache miss - fetch from database
pos = await service.list_purchase_orders(
@@ -172,8 +169,12 @@ async def list_purchase_orders(
# PERFORMANCE OPTIMIZATION: Cache the result (20s TTL for purchase orders)
if cache_key:
await set_cached(cache_key, [po.model_dump() for po in result], ttl=20)
logger.debug("Cached purchase orders", cache_key=cache_key, ttl=20, tenant_id=tenant_id, status=status)
try:
import json
await set_with_ttl(cache_key, json.dumps([po.model_dump() for po in result]), ttl=20)
logger.debug("Cached purchase orders", cache_key=cache_key, ttl=20, tenant_id=tenant_id, status=status)
except Exception as e:
logger.warning("Cache write failed, continuing without caching", cache_key=cache_key, error=str(e))
return result

View File

@@ -33,7 +33,7 @@ from shared.clients.suppliers_client import SuppliersServiceClient
from shared.clients.inventory_client import InventoryServiceClient
from shared.config.base import BaseServiceSettings
from shared.messaging import RabbitMQClient, UnifiedEventPublisher, EVENT_TYPES
from app.utils.cache import delete_cached, make_cache_key
from shared.redis_utils import get_keys_pattern, get_redis_client
logger = structlog.get_logger()
@@ -396,9 +396,18 @@ class PurchaseOrderService:
await self.db.commit()
# PHASE 2: Invalidate purchase orders cache
cache_key = make_cache_key("purchase_orders", str(tenant_id))
await delete_cached(cache_key)
logger.debug("Invalidated purchase orders cache", cache_key=cache_key, tenant_id=str(tenant_id))
# Get all purchase order cache keys for this tenant and delete them
try:
cache_pattern = f"purchase_orders:{tenant_id}:*"
client = await get_redis_client()
if client:
keys = await client.keys(cache_pattern)
if keys:
await client.delete(*keys)
logger.debug("Invalidated purchase orders cache", pattern=cache_pattern, keys_deleted=len(keys), tenant_id=str(tenant_id))
except Exception as e:
logger.warning("Cache invalidation failed, continuing without cache invalidation",
pattern=f"purchase_orders:{tenant_id}:*", error=str(e))
# Acknowledge PO approval alerts (non-blocking)
try:

View File

@@ -1,26 +1,9 @@
# services/alert_processor/app/utils/__init__.py
# services/procurement/app/utils/__init__.py
"""
Utility modules for alert processor service
Utility modules for procurement service
"""
from .cache import (
get_redis_client,
close_redis,
get_cached,
set_cached,
delete_cached,
delete_pattern,
cache_response,
make_cache_key,
)
# Note: Redis utilities are now provided by shared.redis_utils
# Import from shared.redis_utils instead of local cache module
__all__ = [
'get_redis_client',
'close_redis',
'get_cached',
'set_cached',
'delete_cached',
'delete_pattern',
'cache_response',
'make_cache_key',
]
__all__ = []

View File

@@ -1,265 +0,0 @@
# services/orchestrator/app/utils/cache.py
"""
Redis caching utilities for dashboard endpoints
"""
import json
import redis.asyncio as redis
from typing import Optional, Any, Callable
from functools import wraps
import structlog
from app.core.config import settings
from pydantic import BaseModel
logger = structlog.get_logger()
# Redis client instance
_redis_client: Optional[redis.Redis] = None
async def get_redis_client() -> redis.Redis:
"""Get or create Redis client"""
global _redis_client
if _redis_client is None:
try:
# Check if TLS is enabled - convert string to boolean properly
redis_tls_str = str(getattr(settings, 'REDIS_TLS_ENABLED', 'false')).lower()
redis_tls_enabled = redis_tls_str in ('true', '1', 'yes', 'on')
connection_kwargs = {
'host': str(getattr(settings, 'REDIS_HOST', 'localhost')),
'port': int(getattr(settings, 'REDIS_PORT', 6379)),
'db': int(getattr(settings, 'REDIS_DB', 0)),
'decode_responses': True,
'socket_connect_timeout': 5,
'socket_timeout': 5
}
# Add password if configured
redis_password = getattr(settings, 'REDIS_PASSWORD', None)
if redis_password:
connection_kwargs['password'] = redis_password
# Add SSL/TLS support if enabled
if redis_tls_enabled:
import ssl
connection_kwargs['ssl'] = True
connection_kwargs['ssl_cert_reqs'] = ssl.CERT_NONE
logger.debug(f"Redis TLS enabled - connecting with SSL to {connection_kwargs['host']}:{connection_kwargs['port']}")
_redis_client = redis.Redis(**connection_kwargs)
# Test connection
await _redis_client.ping()
logger.info(f"Redis client connected successfully (TLS: {redis_tls_enabled})")
except Exception as e:
logger.warning(f"Failed to connect to Redis: {e}. Caching will be disabled.")
_redis_client = None
return _redis_client
async def close_redis():
"""Close Redis connection"""
global _redis_client
if _redis_client:
await _redis_client.close()
_redis_client = None
logger.info("Redis connection closed")
async def get_cached(key: str) -> Optional[Any]:
"""
Get cached value by key
Args:
key: Cache key
Returns:
Cached value (deserialized from JSON) or None if not found or error
"""
try:
client = await get_redis_client()
if not client:
return None
cached = await client.get(key)
if cached:
logger.debug(f"Cache hit: {key}")
return json.loads(cached)
else:
logger.debug(f"Cache miss: {key}")
return None
except Exception as e:
logger.warning(f"Cache get error for key {key}: {e}")
return None
def _serialize_value(value: Any) -> Any:
"""
Recursively serialize values for JSON storage, handling Pydantic models properly.
Args:
value: Value to serialize
Returns:
JSON-serializable value
"""
if isinstance(value, BaseModel):
# Convert Pydantic model to dictionary
return value.model_dump()
elif isinstance(value, (list, tuple)):
# Recursively serialize list/tuple elements
return [_serialize_value(item) for item in value]
elif isinstance(value, dict):
# Recursively serialize dictionary values
return {key: _serialize_value(val) for key, val in value.items()}
else:
# For other types, use default serialization
return value
async def set_cached(key: str, value: Any, ttl: int = 60) -> bool:
"""
Set cached value with TTL
Args:
key: Cache key
value: Value to cache (will be JSON serialized)
ttl: Time to live in seconds
Returns:
True if successful, False otherwise
"""
try:
client = await get_redis_client()
if not client:
return False
# Serialize value properly before JSON encoding
serialized_value = _serialize_value(value)
serialized = json.dumps(serialized_value)
await client.setex(key, ttl, serialized)
logger.debug(f"Cache set: {key} (TTL: {ttl}s)")
return True
except Exception as e:
logger.warning(f"Cache set error for key {key}: {e}")
return False
async def delete_cached(key: str) -> bool:
"""
Delete cached value
Args:
key: Cache key
Returns:
True if successful, False otherwise
"""
try:
client = await get_redis_client()
if not client:
return False
await client.delete(key)
logger.debug(f"Cache deleted: {key}")
return True
except Exception as e:
logger.warning(f"Cache delete error for key {key}: {e}")
return False
async def delete_pattern(pattern: str) -> int:
"""
Delete all keys matching pattern
Args:
pattern: Redis key pattern (e.g., "dashboard:*")
Returns:
Number of keys deleted
"""
try:
client = await get_redis_client()
if not client:
return 0
keys = []
async for key in client.scan_iter(match=pattern):
keys.append(key)
if keys:
deleted = await client.delete(*keys)
logger.info(f"Deleted {deleted} keys matching pattern: {pattern}")
return deleted
return 0
except Exception as e:
logger.warning(f"Cache delete pattern error for {pattern}: {e}")
return 0
def cache_response(key_prefix: str, ttl: int = 60):
"""
Decorator to cache endpoint responses
Args:
key_prefix: Prefix for cache key (will be combined with tenant_id)
ttl: Time to live in seconds
Usage:
@cache_response("dashboard:health", ttl=30)
async def get_health(tenant_id: str):
...
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract tenant_id from kwargs or args
tenant_id = kwargs.get('tenant_id')
if not tenant_id and args:
# Try to find tenant_id in args (assuming it's the first argument)
tenant_id = args[0] if len(args) > 0 else None
if not tenant_id:
# No tenant_id, skip caching
return await func(*args, **kwargs)
# Build cache key
cache_key = f"{key_prefix}:{tenant_id}"
# Try to get from cache
cached_value = await get_cached(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = await func(*args, **kwargs)
# Cache result
await set_cached(cache_key, result, ttl)
return result
return wrapper
return decorator
def make_cache_key(prefix: str, tenant_id: str, **params) -> str:
"""
Create a cache key with optional parameters
Args:
prefix: Key prefix
tenant_id: Tenant ID
**params: Additional parameters to include in key
Returns:
Cache key string
"""
key_parts = [prefix, tenant_id]
for k, v in sorted(params.items()):
if v is not None:
key_parts.append(f"{k}:{v}")
return ":".join(key_parts)

View File

@@ -366,11 +366,12 @@ class AlertEventConsumer:
# Redis-based rate limiting implementation
try:
import redis.asyncio as redis
import os
from datetime import datetime, timedelta
from app.core.config import Settings
# Connect to Redis
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
# Connect to Redis using proper configuration with TLS and auth
settings = Settings()
redis_url = settings.REDIS_URL
redis_client = await redis.from_url(redis_url, decode_responses=True)
# Rate limit keys