From c68d82ca7f8e2aa8bbbc6a67921477add53f9c99 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Tue, 16 Dec 2025 11:33:22 +0100 Subject: [PATCH] Fix critical bugs and standardize service integrations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical Fixes: - Orchestrator: Add missing OrchestrationStatus import (fixes HTTP 500 during demo clone) - Procurement: Migrate from custom cache utils to shared Redis utils - Suppliers: Use proper Settings for Redis configuration with TLS/auth - Recipes/Suppliers clients: Fix endpoint paths (remove duplicate path segments) - Procurement client: Use suppliers service directly for supplier details Details: 1. services/orchestrator/app/api/internal_demo.py: - Added OrchestrationStatus import to fix cloning error - This was causing HTTP 500 errors during demo session cloning 2. services/procurement/app/api/purchase_orders.py + service: - Replaced app.utils.cache with shared.redis_utils - Standardizes caching across all services - Removed custom cache utilities (deleted app/utils/cache.py) 3. services/suppliers/app/consumers/alert_event_consumer.py: - Use Settings().REDIS_URL instead of os.getenv - Ensures proper Redis connection with TLS and authentication 4. shared/clients/recipes_client.py: - Fixed endpoint paths: recipes/recipes/{id} → recipes/{id} - Applied to all recipe methods (by_id, by_products, instructions, yield) 5. shared/clients/suppliers_client.py: - Fixed endpoint path: suppliers/suppliers/{id} → suppliers/{id} 6. shared/clients/procurement_client.py: - get_supplier_by_id now uses SuppliersServiceClient directly - Removes incorrect call to procurement service for supplier details Impact: - Demo session cloning now works without orchestrator errors ✅ - Consistent Redis usage across all services - Correct service boundaries (suppliers data from suppliers service) - Clean client endpoint paths 🤖 Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- .../orchestrator/app/api/internal_demo.py | 2 +- .../procurement/app/api/purchase_orders.py | 29 +- .../app/services/purchase_order_service.py | 17 +- services/procurement/app/utils/__init__.py | 27 +- services/procurement/app/utils/cache.py | 265 ------------------ .../app/consumers/alert_event_consumer.py | 7 +- shared/clients/procurement_client.py | 8 +- shared/clients/recipes_client.py | 10 +- shared/clients/suppliers_client.py | 2 +- 9 files changed, 48 insertions(+), 319 deletions(-) delete mode 100644 services/procurement/app/utils/cache.py diff --git a/services/orchestrator/app/api/internal_demo.py b/services/orchestrator/app/api/internal_demo.py index 658f5daa..4a65e9b5 100644 --- a/services/orchestrator/app/api/internal_demo.py +++ b/services/orchestrator/app/api/internal_demo.py @@ -13,7 +13,7 @@ import json from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete, func -from app.models.orchestration_run import OrchestrationRun +from app.models.orchestration_run import OrchestrationRun, OrchestrationStatus import uuid from datetime import datetime, timezone, timedelta from typing import Optional diff --git a/services/procurement/app/api/purchase_orders.py b/services/procurement/app/api/purchase_orders.py index 2bf4b04b..03fe557b 100644 --- a/services/procurement/app/api/purchase_orders.py +++ b/services/procurement/app/api/purchase_orders.py @@ -27,7 +27,7 @@ from app.schemas.purchase_order_schemas import ( ) from shared.routing import RouteBuilder from shared.auth.decorators import get_current_user_dep -from app.utils.cache import get_cached, set_cached, make_cache_key +from shared.redis_utils import get_value, set_with_ttl import structlog logger = structlog.get_logger() @@ -146,17 +146,14 @@ async def list_purchase_orders( # Only skip cache for supplier_id filter and pagination (skip > 0) cache_key = None if skip == 0 and supplier_id is None: - cache_key = make_cache_key( - "purchase_orders", - tenant_id, - limit=limit, - status=status, # Include status in cache key - enrich_supplier=enrich_supplier - ) - cached_result = await get_cached(cache_key) - if cached_result is not None: - logger.debug("Cache hit for purchase orders", cache_key=cache_key, tenant_id=tenant_id, status=status) - return [PurchaseOrderResponse(**po) for po in cached_result] + cache_key = f"purchase_orders:{tenant_id}:limit:{limit}:status:{status}:enrich:{enrich_supplier}" + try: + cached_result = await get_value(cache_key) + if cached_result is not None: + logger.debug("Cache hit for purchase orders", cache_key=cache_key, tenant_id=tenant_id, status=status) + return [PurchaseOrderResponse(**po) for po in cached_result] + except Exception as e: + logger.warning("Cache read failed, continuing without cache", cache_key=cache_key, error=str(e)) # Cache miss - fetch from database pos = await service.list_purchase_orders( @@ -172,8 +169,12 @@ async def list_purchase_orders( # PERFORMANCE OPTIMIZATION: Cache the result (20s TTL for purchase orders) if cache_key: - await set_cached(cache_key, [po.model_dump() for po in result], ttl=20) - logger.debug("Cached purchase orders", cache_key=cache_key, ttl=20, tenant_id=tenant_id, status=status) + try: + import json + await set_with_ttl(cache_key, json.dumps([po.model_dump() for po in result]), ttl=20) + logger.debug("Cached purchase orders", cache_key=cache_key, ttl=20, tenant_id=tenant_id, status=status) + except Exception as e: + logger.warning("Cache write failed, continuing without caching", cache_key=cache_key, error=str(e)) return result diff --git a/services/procurement/app/services/purchase_order_service.py b/services/procurement/app/services/purchase_order_service.py index 041a0d0a..80ee7000 100644 --- a/services/procurement/app/services/purchase_order_service.py +++ b/services/procurement/app/services/purchase_order_service.py @@ -33,7 +33,7 @@ from shared.clients.suppliers_client import SuppliersServiceClient from shared.clients.inventory_client import InventoryServiceClient from shared.config.base import BaseServiceSettings from shared.messaging import RabbitMQClient, UnifiedEventPublisher, EVENT_TYPES -from app.utils.cache import delete_cached, make_cache_key +from shared.redis_utils import get_keys_pattern, get_redis_client logger = structlog.get_logger() @@ -396,9 +396,18 @@ class PurchaseOrderService: await self.db.commit() # PHASE 2: Invalidate purchase orders cache - cache_key = make_cache_key("purchase_orders", str(tenant_id)) - await delete_cached(cache_key) - logger.debug("Invalidated purchase orders cache", cache_key=cache_key, tenant_id=str(tenant_id)) + # Get all purchase order cache keys for this tenant and delete them + try: + cache_pattern = f"purchase_orders:{tenant_id}:*" + client = await get_redis_client() + if client: + keys = await client.keys(cache_pattern) + if keys: + await client.delete(*keys) + logger.debug("Invalidated purchase orders cache", pattern=cache_pattern, keys_deleted=len(keys), tenant_id=str(tenant_id)) + except Exception as e: + logger.warning("Cache invalidation failed, continuing without cache invalidation", + pattern=f"purchase_orders:{tenant_id}:*", error=str(e)) # Acknowledge PO approval alerts (non-blocking) try: diff --git a/services/procurement/app/utils/__init__.py b/services/procurement/app/utils/__init__.py index 2cddc34c..8282b0cb 100644 --- a/services/procurement/app/utils/__init__.py +++ b/services/procurement/app/utils/__init__.py @@ -1,26 +1,9 @@ -# services/alert_processor/app/utils/__init__.py +# services/procurement/app/utils/__init__.py """ -Utility modules for alert processor service +Utility modules for procurement service """ -from .cache import ( - get_redis_client, - close_redis, - get_cached, - set_cached, - delete_cached, - delete_pattern, - cache_response, - make_cache_key, -) +# Note: Redis utilities are now provided by shared.redis_utils +# Import from shared.redis_utils instead of local cache module -__all__ = [ - 'get_redis_client', - 'close_redis', - 'get_cached', - 'set_cached', - 'delete_cached', - 'delete_pattern', - 'cache_response', - 'make_cache_key', -] +__all__ = [] diff --git a/services/procurement/app/utils/cache.py b/services/procurement/app/utils/cache.py deleted file mode 100644 index 7015ddb5..00000000 --- a/services/procurement/app/utils/cache.py +++ /dev/null @@ -1,265 +0,0 @@ -# services/orchestrator/app/utils/cache.py -""" -Redis caching utilities for dashboard endpoints -""" - -import json -import redis.asyncio as redis -from typing import Optional, Any, Callable -from functools import wraps -import structlog -from app.core.config import settings -from pydantic import BaseModel - -logger = structlog.get_logger() - -# Redis client instance -_redis_client: Optional[redis.Redis] = None - - -async def get_redis_client() -> redis.Redis: - """Get or create Redis client""" - global _redis_client - - if _redis_client is None: - try: - # Check if TLS is enabled - convert string to boolean properly - redis_tls_str = str(getattr(settings, 'REDIS_TLS_ENABLED', 'false')).lower() - redis_tls_enabled = redis_tls_str in ('true', '1', 'yes', 'on') - - connection_kwargs = { - 'host': str(getattr(settings, 'REDIS_HOST', 'localhost')), - 'port': int(getattr(settings, 'REDIS_PORT', 6379)), - 'db': int(getattr(settings, 'REDIS_DB', 0)), - 'decode_responses': True, - 'socket_connect_timeout': 5, - 'socket_timeout': 5 - } - - # Add password if configured - redis_password = getattr(settings, 'REDIS_PASSWORD', None) - if redis_password: - connection_kwargs['password'] = redis_password - - # Add SSL/TLS support if enabled - if redis_tls_enabled: - import ssl - connection_kwargs['ssl'] = True - connection_kwargs['ssl_cert_reqs'] = ssl.CERT_NONE - logger.debug(f"Redis TLS enabled - connecting with SSL to {connection_kwargs['host']}:{connection_kwargs['port']}") - - _redis_client = redis.Redis(**connection_kwargs) - - # Test connection - await _redis_client.ping() - logger.info(f"Redis client connected successfully (TLS: {redis_tls_enabled})") - except Exception as e: - logger.warning(f"Failed to connect to Redis: {e}. Caching will be disabled.") - _redis_client = None - - return _redis_client - - -async def close_redis(): - """Close Redis connection""" - global _redis_client - if _redis_client: - await _redis_client.close() - _redis_client = None - logger.info("Redis connection closed") - - -async def get_cached(key: str) -> Optional[Any]: - """ - Get cached value by key - - Args: - key: Cache key - - Returns: - Cached value (deserialized from JSON) or None if not found or error - """ - try: - client = await get_redis_client() - if not client: - return None - - cached = await client.get(key) - if cached: - logger.debug(f"Cache hit: {key}") - return json.loads(cached) - else: - logger.debug(f"Cache miss: {key}") - return None - except Exception as e: - logger.warning(f"Cache get error for key {key}: {e}") - return None - - -def _serialize_value(value: Any) -> Any: - """ - Recursively serialize values for JSON storage, handling Pydantic models properly. - - Args: - value: Value to serialize - - Returns: - JSON-serializable value - """ - if isinstance(value, BaseModel): - # Convert Pydantic model to dictionary - return value.model_dump() - elif isinstance(value, (list, tuple)): - # Recursively serialize list/tuple elements - return [_serialize_value(item) for item in value] - elif isinstance(value, dict): - # Recursively serialize dictionary values - return {key: _serialize_value(val) for key, val in value.items()} - else: - # For other types, use default serialization - return value - - -async def set_cached(key: str, value: Any, ttl: int = 60) -> bool: - """ - Set cached value with TTL - - Args: - key: Cache key - value: Value to cache (will be JSON serialized) - ttl: Time to live in seconds - - Returns: - True if successful, False otherwise - """ - try: - client = await get_redis_client() - if not client: - return False - - # Serialize value properly before JSON encoding - serialized_value = _serialize_value(value) - serialized = json.dumps(serialized_value) - await client.setex(key, ttl, serialized) - logger.debug(f"Cache set: {key} (TTL: {ttl}s)") - return True - except Exception as e: - logger.warning(f"Cache set error for key {key}: {e}") - return False - - -async def delete_cached(key: str) -> bool: - """ - Delete cached value - - Args: - key: Cache key - - Returns: - True if successful, False otherwise - """ - try: - client = await get_redis_client() - if not client: - return False - - await client.delete(key) - logger.debug(f"Cache deleted: {key}") - return True - except Exception as e: - logger.warning(f"Cache delete error for key {key}: {e}") - return False - - -async def delete_pattern(pattern: str) -> int: - """ - Delete all keys matching pattern - - Args: - pattern: Redis key pattern (e.g., "dashboard:*") - - Returns: - Number of keys deleted - """ - try: - client = await get_redis_client() - if not client: - return 0 - - keys = [] - async for key in client.scan_iter(match=pattern): - keys.append(key) - - if keys: - deleted = await client.delete(*keys) - logger.info(f"Deleted {deleted} keys matching pattern: {pattern}") - return deleted - return 0 - except Exception as e: - logger.warning(f"Cache delete pattern error for {pattern}: {e}") - return 0 - - -def cache_response(key_prefix: str, ttl: int = 60): - """ - Decorator to cache endpoint responses - - Args: - key_prefix: Prefix for cache key (will be combined with tenant_id) - ttl: Time to live in seconds - - Usage: - @cache_response("dashboard:health", ttl=30) - async def get_health(tenant_id: str): - ... - """ - def decorator(func: Callable): - @wraps(func) - async def wrapper(*args, **kwargs): - # Extract tenant_id from kwargs or args - tenant_id = kwargs.get('tenant_id') - if not tenant_id and args: - # Try to find tenant_id in args (assuming it's the first argument) - tenant_id = args[0] if len(args) > 0 else None - - if not tenant_id: - # No tenant_id, skip caching - return await func(*args, **kwargs) - - # Build cache key - cache_key = f"{key_prefix}:{tenant_id}" - - # Try to get from cache - cached_value = await get_cached(cache_key) - if cached_value is not None: - return cached_value - - # Execute function - result = await func(*args, **kwargs) - - # Cache result - await set_cached(cache_key, result, ttl) - - return result - - return wrapper - return decorator - - -def make_cache_key(prefix: str, tenant_id: str, **params) -> str: - """ - Create a cache key with optional parameters - - Args: - prefix: Key prefix - tenant_id: Tenant ID - **params: Additional parameters to include in key - - Returns: - Cache key string - """ - key_parts = [prefix, tenant_id] - for k, v in sorted(params.items()): - if v is not None: - key_parts.append(f"{k}:{v}") - return ":".join(key_parts) diff --git a/services/suppliers/app/consumers/alert_event_consumer.py b/services/suppliers/app/consumers/alert_event_consumer.py index 04e119cb..26aa287a 100644 --- a/services/suppliers/app/consumers/alert_event_consumer.py +++ b/services/suppliers/app/consumers/alert_event_consumer.py @@ -366,11 +366,12 @@ class AlertEventConsumer: # Redis-based rate limiting implementation try: import redis.asyncio as redis - import os from datetime import datetime, timedelta + from app.core.config import Settings - # Connect to Redis - redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') + # Connect to Redis using proper configuration with TLS and auth + settings = Settings() + redis_url = settings.REDIS_URL redis_client = await redis.from_url(redis_url, decode_responses=True) # Rate limit keys diff --git a/shared/clients/procurement_client.py b/shared/clients/procurement_client.py index c3bbeb38..71ea9c23 100755 --- a/shared/clients/procurement_client.py +++ b/shared/clients/procurement_client.py @@ -538,10 +538,10 @@ class ProcurementServiceClient(BaseServiceClient): Supplier details """ try: - response = await self.get( - f"procurement/suppliers/{supplier_id}", - tenant_id=tenant_id - ) + # Use suppliers service to get supplier details + from shared.clients.suppliers_client import SuppliersServiceClient + suppliers_client = SuppliersServiceClient(self.config) + response = await suppliers_client.get_supplier_by_id(tenant_id, supplier_id) if response: logger.info("Retrieved supplier details", diff --git a/shared/clients/recipes_client.py b/shared/clients/recipes_client.py index 5dcb55cd..9280f614 100755 --- a/shared/clients/recipes_client.py +++ b/shared/clients/recipes_client.py @@ -29,7 +29,7 @@ class RecipesServiceClient(BaseServiceClient): async def get_recipe_by_id(self, tenant_id: str, recipe_id: str) -> Optional[Dict[str, Any]]: """Get recipe details by ID""" try: - result = await self.get(f"recipes/recipes/{recipe_id}", tenant_id=tenant_id) + result = await self.get(f"recipes/{recipe_id}", tenant_id=tenant_id) if result: logger.info("Retrieved recipe details from recipes service", recipe_id=recipe_id, tenant_id=tenant_id) @@ -43,7 +43,7 @@ class RecipesServiceClient(BaseServiceClient): """Get recipes for multiple products""" try: params = {"product_ids": ",".join(product_ids)} - result = await self.get("recipes/recipes/by-products", tenant_id=tenant_id, params=params) + result = await self.get("recipes/by-products", tenant_id=tenant_id, params=params) recipes = result.get('recipes', []) if result else [] logger.info("Retrieved recipes by product IDs from recipes service", product_ids_count=len(product_ids), @@ -149,7 +149,7 @@ class RecipesServiceClient(BaseServiceClient): async def get_production_instructions(self, tenant_id: str, recipe_id: str) -> Optional[Dict[str, Any]]: """Get detailed production instructions for a recipe""" try: - result = await self.get(f"recipes/recipes/{recipe_id}/production-instructions", tenant_id=tenant_id) + result = await self.get(f"recipes/{recipe_id}/production-instructions", tenant_id=tenant_id) if result: logger.info("Retrieved production instructions from recipes service", recipe_id=recipe_id, tenant_id=tenant_id) @@ -162,7 +162,7 @@ class RecipesServiceClient(BaseServiceClient): async def get_recipe_yield_info(self, tenant_id: str, recipe_id: str) -> Optional[Dict[str, Any]]: """Get yield information for a recipe""" try: - result = await self.get(f"recipes/recipes/{recipe_id}/yield", tenant_id=tenant_id) + result = await self.get(f"recipes/{recipe_id}/yield", tenant_id=tenant_id) if result: logger.info("Retrieved recipe yield info from recipes service", recipe_id=recipe_id, tenant_id=tenant_id) @@ -196,7 +196,7 @@ class RecipesServiceClient(BaseServiceClient): async def get_recipe_cost_analysis(self, tenant_id: str, recipe_id: str) -> Optional[Dict[str, Any]]: """Get cost analysis for a recipe""" try: - result = await self.get(f"recipes/recipes/{recipe_id}/cost-analysis", tenant_id=tenant_id) + result = await self.get(f"recipes/{recipe_id}/cost-analysis", tenant_id=tenant_id) if result: logger.info("Retrieved recipe cost analysis from recipes service", recipe_id=recipe_id, tenant_id=tenant_id) diff --git a/shared/clients/suppliers_client.py b/shared/clients/suppliers_client.py index 02783889..d6c27810 100755 --- a/shared/clients/suppliers_client.py +++ b/shared/clients/suppliers_client.py @@ -28,7 +28,7 @@ class SuppliersServiceClient(BaseServiceClient): async def get_supplier_by_id(self, tenant_id: str, supplier_id: str) -> Optional[Dict[str, Any]]: """Get supplier details by ID""" try: - result = await self.get(f"suppliers/suppliers/{supplier_id}", tenant_id=tenant_id) + result = await self.get(f"suppliers/{supplier_id}", tenant_id=tenant_id) if result: logger.info("Retrieved supplier details from suppliers service", supplier_id=supplier_id, tenant_id=tenant_id)