Files
bakery-ia/shared/clients/base_service_client.py

438 lines
18 KiB
Python
Raw Permalink Normal View History

2025-07-29 15:08:55 +02:00
# shared/clients/base_service_client.py
"""
Base Service Client for Inter-Service Communication
Provides a reusable foundation for all service-to-service API calls
"""
import time
import asyncio
import httpx
import structlog
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Union
from urllib.parse import urljoin
from shared.auth.jwt_handler import JWTHandler
from shared.config.base import BaseServiceSettings
from shared.clients.circuit_breaker import CircuitBreaker, CircuitBreakerOpenException
2025-07-29 15:08:55 +02:00
logger = structlog.get_logger()
class ServiceAuthenticator:
"""Handles service-to-service authentication via gateway"""
def __init__(self, service_name: str, config: BaseServiceSettings):
self.service_name = service_name
self.config = config
self.jwt_handler = JWTHandler(config.JWT_SECRET_KEY)
self._cached_token = None
self._token_expires_at = 0
2026-01-12 14:24:14 +01:00
self._cached_tenant_id = None # Track tenant context for cached tokens
2025-07-29 15:08:55 +02:00
2026-01-12 14:24:14 +01:00
async def get_service_token(self, tenant_id: Optional[str] = None) -> str:
2025-07-29 15:08:55 +02:00
"""Get a valid service token, using cache when possible"""
current_time = int(time.time())
2026-01-12 14:24:14 +01:00
# Return cached token if still valid (with 5 min buffer) and tenant context matches
2025-07-29 15:08:55 +02:00
if (self._cached_token and
2026-01-12 14:24:14 +01:00
self._token_expires_at > current_time + 300 and
(tenant_id is None or self._cached_tenant_id == tenant_id)):
2025-07-29 15:08:55 +02:00
return self._cached_token
2026-01-12 14:24:14 +01:00
# Create new service token using unified JWT handler
2025-07-29 15:08:55 +02:00
try:
2026-01-12 14:24:14 +01:00
token = self.jwt_handler.create_service_token(
service_name=self.service_name,
tenant_id=tenant_id
)
# Extract expiration from token for caching
import json
from jose import jwt
payload = jwt.decode(token, self.jwt_handler.secret_key, algorithms=[self.jwt_handler.algorithm], options={"verify_signature": False})
token_expires_at = payload.get("exp", current_time + 3600)
2025-07-29 15:08:55 +02:00
self._cached_token = token
self._token_expires_at = token_expires_at
2026-01-12 14:24:14 +01:00
self._cached_tenant_id = tenant_id # Store tenant context for caching
2025-07-29 15:08:55 +02:00
2026-01-12 14:24:14 +01:00
logger.debug("Created new service token", service=self.service_name, expires_at=token_expires_at, tenant_id=tenant_id)
2025-07-29 15:08:55 +02:00
return token
except Exception as e:
logger.error(f"Failed to create service token: {e}", service=self.service_name)
raise ValueError(f"Service token creation failed: {e}")
def get_request_headers(self, tenant_id: Optional[str] = None) -> Dict[str, str]:
"""Get standard headers for service requests"""
headers = {
"X-Service": f"{self.service_name}-service",
"User-Agent": f"{self.service_name}-service/1.0.0"
}
if tenant_id:
2026-01-12 22:15:11 +01:00
headers["x-tenant-id"] = str(tenant_id)
2025-07-29 15:08:55 +02:00
return headers
class BaseServiceClient(ABC):
"""
Base class for all inter-service communication clients
Provides common functionality for API calls through the gateway
"""
def __init__(self, service_name: str, config: BaseServiceSettings):
self.service_name = service_name
self.config = config
self.gateway_url = config.GATEWAY_URL
self.authenticator = ServiceAuthenticator(service_name, config)
2025-07-29 15:08:55 +02:00
# HTTP client configuration
self.timeout = config.HTTP_TIMEOUT
self.retries = config.HTTP_RETRIES
self.retry_delay = config.HTTP_RETRY_DELAY
# Circuit breaker for fault tolerance
self.circuit_breaker = CircuitBreaker(
service_name=f"{service_name}-client",
failure_threshold=5,
timeout=60,
success_threshold=2
)
2025-12-13 23:57:54 +01:00
2025-07-29 15:08:55 +02:00
@abstractmethod
def get_service_base_path(self) -> str:
"""Return the base path for this service's APIs"""
pass
async def _make_request(
self,
method: str,
endpoint: str,
tenant_id: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[Union[int, httpx.Timeout]] = None
) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""
Make an authenticated request to another service via gateway with circuit breaker protection.
2025-07-29 15:08:55 +02:00
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (will be prefixed with service base path)
tenant_id: Optional tenant ID for tenant-scoped requests
data: Request body data (for POST/PUT)
params: Query parameters
headers: Additional headers
timeout: Request timeout override
2025-07-29 15:08:55 +02:00
Returns:
Response data or None if request failed
"""
try:
# Wrap request in circuit breaker
return await self.circuit_breaker.call(
self._do_request,
method,
endpoint,
tenant_id,
data,
params,
headers,
timeout
)
except CircuitBreakerOpenException as e:
logger.error(
"Circuit breaker open - request rejected",
service=self.service_name,
endpoint=endpoint,
error=str(e)
)
return None
except Exception as e:
logger.error(
"Unexpected error in request",
service=self.service_name,
endpoint=endpoint,
error=str(e)
)
return None
async def _do_request(
self,
method: str,
endpoint: str,
tenant_id: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[Union[int, httpx.Timeout]] = None
) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""
Internal method to execute HTTP request with retries.
Called by _make_request through circuit breaker.
"""
2025-07-29 15:08:55 +02:00
try:
2026-01-12 14:24:14 +01:00
# Get service token with tenant context for tenant-scoped requests
token = await self.authenticator.get_service_token(tenant_id)
2025-07-29 15:08:55 +02:00
# Build headers
request_headers = self.authenticator.get_request_headers(tenant_id)
request_headers["Authorization"] = f"Bearer {token}"
request_headers["Content-Type"] = "application/json"
# Propagate request ID for distributed tracing if provided
if headers and "X-Request-ID" in headers:
request_headers["X-Request-ID"] = headers["X-Request-ID"]
2025-07-29 15:08:55 +02:00
if headers:
request_headers.update(headers)
# Build URL
base_path = self.get_service_base_path()
if tenant_id:
# For tenant-scoped endpoints
full_endpoint = f"{base_path}/tenants/{tenant_id}/{endpoint.lstrip('/')}"
else:
# For non-tenant endpoints
full_endpoint = f"{base_path}/{endpoint.lstrip('/')}"
url = urljoin(self.gateway_url, full_endpoint)
2025-11-30 09:12:40 +01:00
# Debug logging for URL construction
logger.debug(
"Making service request",
service=self.service_name,
method=method,
url=url,
tenant_id=tenant_id,
endpoint=endpoint,
params=params
)
2025-07-29 15:08:55 +02:00
# Make request with retries
for attempt in range(self.retries + 1):
try:
# Handle different timeout configurations
if isinstance(timeout, httpx.Timeout):
client_timeout = timeout
else:
client_timeout = timeout or self.timeout
async with httpx.AsyncClient(timeout=client_timeout) as client:
response = await client.request(
method=method,
url=url,
json=data,
params=params,
headers=request_headers
)
if response.status_code == 200:
return response.json()
elif response.status_code == 201:
return response.json()
elif response.status_code == 204:
return {} # No content success
elif response.status_code == 401:
# Token might be expired, clear cache and retry once
if attempt == 0:
self.authenticator._cached_token = None
logger.warning("Token expired, retrying with new token")
continue
else:
logger.error("Authentication failed after retry")
return None
elif response.status_code == 404:
2025-11-30 09:12:40 +01:00
logger.warning(
"Endpoint not found",
url=url,
service=self.service_name,
endpoint=endpoint,
constructed_endpoint=full_endpoint,
tenant_id=tenant_id
)
2025-07-29 15:08:55 +02:00
return None
else:
error_detail = "Unknown error"
try:
error_json = response.json()
error_detail = error_json.get('detail', f"HTTP {response.status_code}")
except:
error_detail = f"HTTP {response.status_code}: {response.text}"
logger.error(f"Request failed: {error_detail}",
url=url, status_code=response.status_code)
return None
except httpx.TimeoutException:
if attempt < self.retries:
logger.warning(f"Request timeout, retrying ({attempt + 1}/{self.retries})")
import asyncio
await asyncio.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
continue
else:
logger.error(f"Request timeout after {self.retries} retries", url=url)
return None
except Exception as e:
if attempt < self.retries:
logger.warning(f"Request failed, retrying ({attempt + 1}/{self.retries}): {e}")
import asyncio
await asyncio.sleep(self.retry_delay * (2 ** attempt))
continue
else:
logger.error(f"Request failed after {self.retries} retries: {e}", url=url)
return None
except Exception as e:
logger.error(f"Unexpected error in _make_request: {e}")
return None
async def _make_paginated_request(
self,
endpoint: str,
tenant_id: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
2025-08-12 18:17:30 +02:00
page_size: int = 1000,
2025-07-29 15:08:55 +02:00
max_pages: int = 100,
timeout: Optional[Union[int, httpx.Timeout]] = None
) -> List[Dict[str, Any]]:
"""
Make paginated GET requests to fetch all records
Handles both direct list and paginated object responses
Args:
endpoint: API endpoint
tenant_id: Optional tenant ID
params: Base query parameters
2025-08-12 18:17:30 +02:00
page_size: Records per page (default 1000)
2025-07-29 15:08:55 +02:00
max_pages: Maximum pages to fetch (safety limit)
timeout: Request timeout override
Returns:
List of all records from all pages
"""
all_records = []
page = 0
base_params = params or {}
logger.info(f"Starting paginated request to {endpoint}",
tenant_id=tenant_id, page_size=page_size)
while page < max_pages:
# Prepare pagination parameters
page_params = base_params.copy()
page_params.update({
"limit": page_size,
"offset": page * page_size
})
logger.debug(f"Fetching page {page + 1} (offset: {page * page_size})",
tenant_id=tenant_id)
# Make request for this page
result = await self._make_request(
"GET",
endpoint,
tenant_id=tenant_id,
params=page_params,
timeout=timeout
)
if result is None:
logger.error(f"Failed to fetch page {page + 1}", tenant_id=tenant_id)
break
# Handle different response formats
if isinstance(result, list):
# Direct list response (no pagination metadata)
records = result
logger.debug(f"Retrieved {len(records)} records from page {page + 1} (direct list)")
if len(records) == 0:
logger.info("No records in response, pagination complete")
break
elif len(records) < page_size:
# Got fewer than requested, this is the last page
all_records.extend(records)
logger.info(f"Final page: retrieved {len(records)} records, total: {len(all_records)}")
break
else:
# Got full page, there might be more
all_records.extend(records)
logger.debug(f"Full page retrieved: {len(records)} records, continuing to next page")
elif isinstance(result, dict):
# Paginated response format
records = result.get('records', result.get('data', []))
total_available = result.get('total', 0)
logger.debug(f"Retrieved {len(records)} records from page {page + 1} (paginated response)")
if not records:
logger.info("No more records found in paginated response")
break
all_records.extend(records)
# Check if we've got all available records
if len(all_records) >= total_available:
logger.info(f"Retrieved all available records: {len(all_records)}/{total_available}")
break
else:
logger.warning(f"Unexpected response format: {type(result)}")
break
page += 1
if page >= max_pages:
logger.warning(f"Reached maximum page limit ({max_pages}), stopping pagination")
logger.info(f"Pagination complete: fetched {len(all_records)} total records",
tenant_id=tenant_id, pages_fetched=page)
return all_records
async def get(self, endpoint: str, tenant_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Make a GET request"""
return await self._make_request("GET", endpoint, tenant_id=tenant_id, params=params)
async def get_paginated(
self,
endpoint: str,
tenant_id: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
2025-08-12 18:17:30 +02:00
page_size: int = 1000,
2025-07-29 15:08:55 +02:00
max_pages: int = 100,
timeout: Optional[Union[int, httpx.Timeout]] = None
) -> List[Dict[str, Any]]:
"""Make a paginated GET request to fetch all records"""
return await self._make_paginated_request(
endpoint,
tenant_id=tenant_id,
params=params,
page_size=page_size,
max_pages=max_pages,
timeout=timeout
)
2025-11-05 22:54:14 +01:00
async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, tenant_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Make a POST request with optional query parameters"""
return await self._make_request("POST", endpoint, tenant_id=tenant_id, data=data, params=params)
2025-07-29 15:08:55 +02:00
async def put(self, endpoint: str, data: Dict[str, Any], tenant_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Make a PUT request"""
return await self._make_request("PUT", endpoint, tenant_id=tenant_id, data=data)
2026-01-13 22:22:38 +01:00
async def patch(self, endpoint: str, data: Dict[str, Any], tenant_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Make a PATCH request"""
return await self._make_request("PATCH", endpoint, tenant_id=tenant_id, data=data)
2025-07-29 15:08:55 +02:00
async def delete(self, endpoint: str, tenant_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Make a DELETE request"""
return await self._make_request("DELETE", endpoint, tenant_id=tenant_id)