418 lines
16 KiB
Python
418 lines
16 KiB
Python
"""
|
|
MinIO Client Library
|
|
Shared client for MinIO object storage operations with TLS support
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import ssl
|
|
import time
|
|
import urllib3
|
|
from typing import Optional, Dict, Any, Union
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
import structlog
|
|
|
|
# Configure logger
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
def with_retry(max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0):
|
|
"""Decorator for retrying operations with exponential backoff
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts
|
|
base_delay: Initial delay between retries in seconds
|
|
max_delay: Maximum delay between retries in seconds
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
last_exception = None
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except (S3Error, urllib3.exceptions.HTTPError, ConnectionError, TimeoutError) as e:
|
|
last_exception = e
|
|
if attempt < max_retries:
|
|
# Exponential backoff with jitter
|
|
delay = min(base_delay * (2 ** attempt), max_delay)
|
|
logger.warning(
|
|
f"MinIO operation failed, retrying in {delay:.1f}s",
|
|
attempt=attempt + 1,
|
|
max_retries=max_retries,
|
|
error=str(e)
|
|
)
|
|
time.sleep(delay)
|
|
else:
|
|
logger.error(
|
|
"MinIO operation failed after all retries",
|
|
attempts=max_retries + 1,
|
|
error=str(e)
|
|
)
|
|
raise last_exception
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class MinIOClient:
|
|
"""Client for MinIO object storage operations with TLS support"""
|
|
|
|
def __init__(self):
|
|
"""Initialize MinIO client with configuration"""
|
|
self._client = None
|
|
self._initialize_client()
|
|
|
|
def _initialize_client(self) -> None:
|
|
"""Initialize MinIO client from environment variables with SSL/TLS support"""
|
|
try:
|
|
# Get configuration from environment
|
|
endpoint = os.getenv("MINIO_ENDPOINT", "minio.bakery-ia.svc.cluster.local:9000")
|
|
access_key = os.getenv("MINIO_ACCESS_KEY", os.getenv("MINIO_ROOT_USER", "admin"))
|
|
secret_key = os.getenv("MINIO_SECRET_KEY", os.getenv("MINIO_ROOT_PASSWORD", "secure-password"))
|
|
use_ssl = os.getenv("MINIO_USE_SSL", "true").lower() == "true"
|
|
|
|
# TLS certificate paths (optional - for cert verification)
|
|
ca_cert_path = os.getenv("MINIO_CA_CERT_PATH", "/etc/ssl/certs/minio-ca.crt")
|
|
# SSL verification is disabled by default for internal cluster with self-signed certs
|
|
# Set MINIO_VERIFY_SSL=true and provide CA cert path for production with proper certs
|
|
verify_ssl = os.getenv("MINIO_VERIFY_SSL", "false").lower() == "true"
|
|
|
|
# Try to get settings from service configuration if available
|
|
try:
|
|
from app.core.config import settings
|
|
if hasattr(settings, 'MINIO_ENDPOINT'):
|
|
endpoint = settings.MINIO_ENDPOINT
|
|
access_key = settings.MINIO_ACCESS_KEY
|
|
secret_key = settings.MINIO_SECRET_KEY
|
|
use_ssl = settings.MINIO_USE_SSL
|
|
except ImportError:
|
|
# Fallback to environment variables (for shared client usage)
|
|
pass
|
|
|
|
# Configure HTTP client with TLS settings
|
|
http_client = None
|
|
if use_ssl:
|
|
# Create custom HTTP client for TLS
|
|
if verify_ssl and os.path.exists(ca_cert_path):
|
|
# Verify certificates against CA
|
|
http_client = urllib3.PoolManager(
|
|
timeout=urllib3.Timeout(connect=10.0, read=60.0),
|
|
maxsize=10,
|
|
cert_reqs='CERT_REQUIRED',
|
|
ca_certs=ca_cert_path,
|
|
retries=urllib3.Retry(
|
|
total=5,
|
|
backoff_factor=0.2,
|
|
status_forcelist=[500, 502, 503, 504]
|
|
)
|
|
)
|
|
logger.info("MinIO TLS with certificate verification enabled",
|
|
ca_cert_path=ca_cert_path)
|
|
else:
|
|
# TLS without certificate verification (for self-signed certs in internal cluster)
|
|
# Still encrypted, just skips cert validation
|
|
http_client = urllib3.PoolManager(
|
|
timeout=urllib3.Timeout(connect=10.0, read=60.0),
|
|
maxsize=10,
|
|
cert_reqs='CERT_NONE',
|
|
retries=urllib3.Retry(
|
|
total=5,
|
|
backoff_factor=0.2,
|
|
status_forcelist=[500, 502, 503, 504]
|
|
)
|
|
)
|
|
# Suppress insecure request warnings for internal cluster
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
logger.info("MinIO TLS enabled without certificate verification (internal cluster)")
|
|
|
|
# Initialize client with SSL/TLS
|
|
self._client = Minio(
|
|
endpoint,
|
|
access_key=access_key,
|
|
secret_key=secret_key,
|
|
secure=use_ssl,
|
|
http_client=http_client
|
|
)
|
|
|
|
logger.info("MinIO client initialized successfully",
|
|
endpoint=endpoint,
|
|
use_ssl=use_ssl,
|
|
verify_ssl=verify_ssl if use_ssl else False)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to initialize MinIO client", error=str(e))
|
|
raise
|
|
|
|
def reconnect(self) -> bool:
|
|
"""Reconnect to MinIO server
|
|
|
|
Useful when connection is lost or credentials have changed.
|
|
|
|
Returns:
|
|
True if reconnection succeeded, False otherwise
|
|
"""
|
|
try:
|
|
logger.info("Attempting to reconnect to MinIO...")
|
|
self._initialize_client()
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to reconnect to MinIO", error=str(e))
|
|
return False
|
|
|
|
@with_retry(max_retries=3, base_delay=1.0)
|
|
def bucket_exists(self, bucket_name: str) -> bool:
|
|
"""Check if bucket exists - handles limited permissions gracefully"""
|
|
try:
|
|
# First try the standard method
|
|
return self._client.bucket_exists(bucket_name)
|
|
except S3Error as e:
|
|
# If we get AccessDenied, try alternative method for limited-permission users
|
|
if e.code == "AccessDenied":
|
|
logger.debug("Access denied for bucket_exists, trying alternative method",
|
|
bucket_name=bucket_name)
|
|
try:
|
|
# Try to list objects - this works with ListBucket permission
|
|
# If bucket doesn't exist, this will raise NoSuchBucket error
|
|
# If bucket exists but user has no permission, this will raise AccessDenied
|
|
objects = list(self._client.list_objects(bucket_name, recursive=False))
|
|
logger.debug("Bucket exists (verified via list_objects)", bucket_name=bucket_name)
|
|
return True
|
|
except S3Error as list_error:
|
|
if list_error.code == "NoSuchBucket":
|
|
logger.debug("Bucket does not exist", bucket_name=bucket_name)
|
|
return False
|
|
else:
|
|
logger.error("Failed to check bucket existence (alternative method)",
|
|
bucket_name=bucket_name,
|
|
error=str(list_error))
|
|
return False
|
|
else:
|
|
logger.error("Failed to check bucket existence",
|
|
bucket_name=bucket_name,
|
|
error=str(e))
|
|
return False
|
|
|
|
def create_bucket(self, bucket_name: str, region: str = "us-east-1") -> bool:
|
|
"""Create a new bucket if it doesn't exist"""
|
|
try:
|
|
if not self.bucket_exists(bucket_name):
|
|
self._client.make_bucket(bucket_name, region)
|
|
logger.info("Created MinIO bucket", bucket_name=bucket_name)
|
|
return True
|
|
return False
|
|
except S3Error as e:
|
|
logger.error("Failed to create bucket",
|
|
bucket_name=bucket_name,
|
|
error=str(e))
|
|
return False
|
|
|
|
@with_retry(max_retries=3, base_delay=1.0)
|
|
def put_object(
|
|
self,
|
|
bucket_name: str,
|
|
object_name: str,
|
|
data: Union[bytes, io.BytesIO, str, Path],
|
|
length: Optional[int] = None,
|
|
content_type: str = "application/octet-stream",
|
|
metadata: Optional[Dict[str, str]] = None
|
|
) -> bool:
|
|
"""Upload an object to MinIO
|
|
|
|
Args:
|
|
bucket_name: Target bucket name
|
|
object_name: Object key/path in the bucket
|
|
data: Data to upload (bytes, BytesIO, string, or Path)
|
|
length: Optional data length (calculated automatically if not provided)
|
|
content_type: MIME type of the object
|
|
metadata: Optional metadata dictionary
|
|
|
|
Returns:
|
|
True if upload succeeded, False otherwise
|
|
"""
|
|
try:
|
|
# Ensure bucket exists
|
|
self.create_bucket(bucket_name)
|
|
|
|
# Convert data to bytes if needed
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
elif isinstance(data, Path):
|
|
with open(data, 'rb') as f:
|
|
data = f.read()
|
|
elif isinstance(data, io.BytesIO):
|
|
data = data.getvalue()
|
|
|
|
# Calculate length if not provided
|
|
data_length = length if length is not None else len(data)
|
|
|
|
# MinIO SDK requires BytesIO stream and explicit length
|
|
data_stream = io.BytesIO(data)
|
|
|
|
# Upload object with proper stream and length
|
|
self._client.put_object(
|
|
bucket_name,
|
|
object_name,
|
|
data_stream,
|
|
length=data_length,
|
|
content_type=content_type,
|
|
metadata=metadata
|
|
)
|
|
|
|
logger.info("Uploaded object to MinIO",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
size=data_length)
|
|
|
|
return True
|
|
|
|
except S3Error as e:
|
|
logger.error("Failed to upload object",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
error=str(e))
|
|
return False
|
|
|
|
@with_retry(max_retries=3, base_delay=1.0)
|
|
def get_object(self, bucket_name: str, object_name: str) -> Optional[bytes]:
|
|
"""Download an object from MinIO"""
|
|
try:
|
|
# Get object data
|
|
response = self._client.get_object(bucket_name, object_name)
|
|
data = response.read()
|
|
|
|
logger.info("Downloaded object from MinIO",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
size=len(data))
|
|
|
|
return data
|
|
|
|
except S3Error as e:
|
|
logger.error("Failed to download object",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
error=str(e))
|
|
return None
|
|
|
|
def object_exists(self, bucket_name: str, object_name: str) -> bool:
|
|
"""Check if object exists"""
|
|
try:
|
|
self._client.stat_object(bucket_name, object_name)
|
|
return True
|
|
except S3Error:
|
|
return False
|
|
|
|
def list_objects(self, bucket_name: str, prefix: str = "") -> list:
|
|
"""List objects in bucket with optional prefix"""
|
|
try:
|
|
objects = self._client.list_objects(bucket_name, prefix=prefix, recursive=True)
|
|
return [obj.object_name for obj in objects]
|
|
except S3Error as e:
|
|
logger.error("Failed to list objects",
|
|
bucket_name=bucket_name,
|
|
prefix=prefix,
|
|
error=str(e))
|
|
return []
|
|
|
|
def delete_object(self, bucket_name: str, object_name: str) -> bool:
|
|
"""Delete an object from MinIO"""
|
|
try:
|
|
self._client.remove_object(bucket_name, object_name)
|
|
logger.info("Deleted object from MinIO",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name)
|
|
return True
|
|
except S3Error as e:
|
|
logger.error("Failed to delete object",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
error=str(e))
|
|
return False
|
|
|
|
def get_presigned_url(
|
|
self,
|
|
bucket_name: str,
|
|
object_name: str,
|
|
expires: int = 3600
|
|
) -> Optional[str]:
|
|
"""Generate presigned URL for object access"""
|
|
try:
|
|
url = self._client.presigned_get_object(
|
|
bucket_name,
|
|
object_name,
|
|
expires=expires
|
|
)
|
|
return url
|
|
except S3Error as e:
|
|
logger.error("Failed to generate presigned URL",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
error=str(e))
|
|
return None
|
|
|
|
def copy_object(
|
|
self,
|
|
source_bucket: str,
|
|
source_object: str,
|
|
dest_bucket: str,
|
|
dest_object: str
|
|
) -> bool:
|
|
"""Copy object within MinIO"""
|
|
try:
|
|
# Ensure destination bucket exists
|
|
self.create_bucket(dest_bucket)
|
|
|
|
# Copy object
|
|
self._client.copy_object(dest_bucket, dest_object,
|
|
f"{source_bucket}/{source_object}")
|
|
|
|
logger.info("Copied object in MinIO",
|
|
source_bucket=source_bucket,
|
|
source_object=source_object,
|
|
dest_bucket=dest_bucket,
|
|
dest_object=dest_object)
|
|
|
|
return True
|
|
except S3Error as e:
|
|
logger.error("Failed to copy object",
|
|
source_bucket=source_bucket,
|
|
source_object=source_object,
|
|
dest_bucket=dest_bucket,
|
|
dest_object=dest_object,
|
|
error=str(e))
|
|
return False
|
|
|
|
def get_object_metadata(self, bucket_name: str, object_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get object metadata"""
|
|
try:
|
|
stat = self._client.stat_object(bucket_name, object_name)
|
|
return {
|
|
"size": stat.size,
|
|
"last_modified": stat.last_modified,
|
|
"content_type": stat.content_type,
|
|
"metadata": stat.metadata or {}
|
|
}
|
|
except S3Error as e:
|
|
logger.error("Failed to get object metadata",
|
|
bucket_name=bucket_name,
|
|
object_name=object_name,
|
|
error=str(e))
|
|
return None
|
|
|
|
def health_check(self) -> bool:
|
|
"""Check MinIO service health"""
|
|
try:
|
|
# Simple bucket list to check connectivity
|
|
self._client.list_buckets()
|
|
return True
|
|
except Exception as e:
|
|
logger.error("MinIO health check failed", error=str(e))
|
|
return False
|
|
|
|
|
|
# Singleton instance for convenience
|
|
minio_client = MinIOClient() |