Add minio support and forntend analitycs

This commit is contained in:
Urtzi Alfaro
2026-01-17 22:42:40 +01:00
parent fbc670ddb3
commit 3c4b5c2a06
53 changed files with 3485 additions and 437 deletions

View File

@@ -1,10 +1,10 @@
# Training Dockerfile
# Add this stage at the top of each service Dockerfile
# Training Service Dockerfile with MinIO Support
# Multi-stage build for optimized production image
FROM python:3.11-slim AS shared
WORKDIR /shared
COPY shared/ /shared/
# Then your main service stage
# Main service stage
FROM python:3.11-slim
WORKDIR /app

View File

@@ -116,29 +116,51 @@ async def broadcast_training_progress(job_id: str, progress: dict):
await websocket_manager.broadcast(job_id, message)
```
### Model Artifact Management
### Model Artifact Management (MinIO Storage)
```python
# Model storage and retrieval
# Model storage and retrieval using MinIO
import joblib
from pathlib import Path
from shared.clients.minio_client import minio_client
# Save trained model
# Save trained model to MinIO
def save_model_artifact(model: Prophet, tenant_id: str, product_id: str) -> str:
"""Serialize and store model"""
model_dir = Path(f"/models/{tenant_id}/{product_id}")
model_dir.mkdir(parents=True, exist_ok=True)
"""Serialize and store model in MinIO"""
import io
version = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
model_path = model_dir / f"model_v{version}.pkl"
model_id = str(uuid.uuid4())
object_name = f"models/{tenant_id}/{product_id}/{model_id}.pkl"
joblib.dump(model, model_path)
return str(model_path)
# Serialize model (joblib.dump writes to file-like objects)
buffer = io.BytesIO()
joblib.dump(model, buffer)
model_data = buffer.getvalue()
# Load trained model
# Upload to MinIO
minio_client.put_object(
bucket_name="training-models",
object_name=object_name,
data=model_data,
content_type="application/octet-stream"
)
# Return MinIO path
return f"minio://training-models/{object_name}"
# Load trained model from MinIO
def load_model_artifact(model_path: str) -> Prophet:
"""Load serialized model"""
return joblib.load(model_path)
"""Load serialized model from MinIO"""
import io
# Parse MinIO path: minio://bucket_name/object_path
_, bucket_and_path = model_path.split("://", 1)
bucket_name, object_name = bucket_and_path.split("/", 1)
# Download from MinIO
model_data = minio_client.get_object(bucket_name, object_name)
# Deserialize (joblib.load reads from file-like objects)
buffer = io.BytesIO(model_data)
return joblib.load(buffer)
```
### Performance Metrics Calculation
@@ -194,8 +216,8 @@ def calculate_performance_metrics(model: Prophet, actual_data: pd.DataFrame) ->
- **Framework**: FastAPI (Python 3.11+) - Async web framework with WebSocket support
- **Database**: PostgreSQL 17 - Training logs, model metadata, job queue
- **ML Library**: Prophet (fbprophet) - Time series forecasting
- **Model Storage**: Joblib - Model serialization
- **File System**: Persistent volumes - Model artifact storage
- **Model Storage**: MinIO (S3-compatible) - Distributed object storage with TLS
- **Serialization**: Joblib - Model serialization
- **WebSocket**: FastAPI WebSocket - Real-time progress updates
- **Messaging**: RabbitMQ 4.1 - Training completion events
- **ORM**: SQLAlchemy 2.0 (async) - Database abstraction
@@ -442,7 +464,13 @@ websocket_messages_sent = Counter(
- `PORT` - Service port (default: 8004)
- `DATABASE_URL` - PostgreSQL connection string
- `RABBITMQ_URL` - RabbitMQ connection string
- `MODEL_STORAGE_PATH` - Path for model artifacts (default: /models)
**MinIO Configuration:**
- `MINIO_ENDPOINT` - MinIO server endpoint (default: minio.bakery-ia.svc.cluster.local:9000)
- `MINIO_ACCESS_KEY` - MinIO access key
- `MINIO_SECRET_KEY` - MinIO secret key
- `MINIO_USE_SSL` - Enable TLS (default: true)
- `MINIO_MODEL_BUCKET` - Bucket for models (default: training-models)
**Training Configuration:**
- `MAX_CONCURRENT_JOBS` - Maximum parallel training jobs (default: 3)
@@ -462,10 +490,9 @@ websocket_messages_sent = Counter(
- `WEBSOCKET_MAX_CONNECTIONS` - Max connections per tenant (default: 10)
- `WEBSOCKET_MESSAGE_QUEUE_SIZE` - Message buffer size (default: 100)
**Storage Configuration:**
- `MODEL_RETENTION_DAYS` - Days to keep old models (default: 90)
- `MAX_MODEL_VERSIONS_PER_PRODUCT` - Version limit (default: 10)
- `ENABLE_MODEL_COMPRESSION` - Compress model files (default: true)
**Storage Configuration (MinIO):**
- `MINIO_MODEL_LIFECYCLE_DAYS` - Days to keep old model versions (default: 90)
- `MINIO_CACHE_TTL_SECONDS` - Model cache TTL in seconds (default: 3600)
## Development Setup
@@ -473,7 +500,7 @@ websocket_messages_sent = Counter(
- Python 3.11+
- PostgreSQL 17
- RabbitMQ 4.1
- Persistent storage for model artifacts
- MinIO (S3-compatible object storage)
### Local Development
```bash
@@ -488,10 +515,13 @@ pip install -r requirements.txt
# Set environment variables
export DATABASE_URL=postgresql://user:pass@localhost:5432/training
export RABBITMQ_URL=amqp://guest:guest@localhost:5672/
export MODEL_STORAGE_PATH=/tmp/models
export MINIO_ENDPOINT=localhost:9000
export MINIO_ACCESS_KEY=minioadmin
export MINIO_SECRET_KEY=minioadmin
export MINIO_USE_SSL=false # Use true in production
# Create model storage directory
mkdir -p /tmp/models
# Start MinIO locally (if not using K8s)
docker run -p 9000:9000 -p 9001:9001 minio/minio server /data --console-address ":9001"
# Run database migrations
alembic upgrade head
@@ -590,7 +620,7 @@ for feature_name in poi_features.keys():
- **External Service** - Fetch weather, traffic, holiday, and POI feature data
- **PostgreSQL** - Store job queue, models, metrics, logs
- **RabbitMQ** - Publish training completion events
- **File System** - Store model artifacts
- **MinIO** - Store model artifacts (S3-compatible object storage with TLS)
### Dependents (Services That Call This)
- **Forecasting Service** - Load trained models for predictions
@@ -627,11 +657,11 @@ for feature_name in poi_features.keys():
4. **Resource Limits** - CPU/memory limits per training job
5. **Priority Queue** - Prioritize important products first
### Storage Optimization
1. **Model Compression** - Compress model artifacts (gzip)
2. **Old Model Cleanup** - Automatic deletion after retention period
3. **Version Limits** - Keep only N most recent versions
4. **Deduplication** - Avoid storing identical models
### Storage Optimization (MinIO)
1. **Object Versioning** - MinIO maintains version history automatically
2. **Lifecycle Policies** - Auto-cleanup old versions after 90 days
3. **TLS Encryption** - Secure communication with MinIO
4. **Distributed Storage** - MinIO handles replication and availability
### WebSocket Optimization
1. **Message Batching** - Batch progress updates (every 2 seconds)

View File

@@ -96,48 +96,48 @@ def check_system_resources() -> Dict[str, Any]:
def check_model_storage() -> Dict[str, Any]:
"""Check model storage health"""
"""Check MinIO model storage health"""
try:
storage_path = settings.MODEL_STORAGE_PATH
from shared.clients.minio_client import minio_client
if not os.path.exists(storage_path):
# Check MinIO connectivity
if not minio_client.health_check():
return {
"status": "warning",
"message": f"Model storage path does not exist: {storage_path}"
"status": "unhealthy",
"message": "MinIO service is not reachable",
"storage_type": "minio"
}
# Check if writable
test_file = os.path.join(storage_path, ".health_check")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
writable = True
except Exception:
writable = False
bucket_name = settings.MINIO_MODEL_BUCKET
# Count model files
model_files = 0
total_size = 0
for root, dirs, files in os.walk(storage_path):
for file in files:
if file.endswith('.pkl'):
model_files += 1
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
# Check if bucket exists
bucket_exists = minio_client.bucket_exists(bucket_name)
if not bucket_exists:
return {
"status": "warning",
"message": f"MinIO bucket does not exist: {bucket_name}",
"storage_type": "minio"
}
# Count model files in MinIO
model_objects = minio_client.list_objects(bucket_name, prefix="models/")
model_files = [obj for obj in model_objects if obj.endswith('.pkl')]
return {
"status": "healthy" if writable else "degraded",
"path": storage_path,
"writable": writable,
"model_files": model_files,
"total_size_mb": round(total_size / 1024 / 1024, 2)
"status": "healthy",
"storage_type": "minio",
"endpoint": settings.MINIO_ENDPOINT,
"bucket": bucket_name,
"use_ssl": settings.MINIO_USE_SSL,
"model_files": len(model_files),
"bucket_exists": bucket_exists
}
except Exception as e:
logger.error(f"Model storage check failed: {e}")
logger.error(f"MinIO storage check failed: {e}")
return {
"status": "error",
"storage_type": "minio",
"error": str(e)
}

View File

@@ -14,7 +14,6 @@ from app.services.training_service import EnhancedTrainingService
from datetime import datetime, timezone
from sqlalchemy import select, delete, func
import uuid
import shutil
from shared.auth.decorators import (
get_current_user_dep,
@@ -304,10 +303,9 @@ async def delete_tenant_models_complete(
"jobs_cancelled": 0,
"models_deleted": 0,
"artifacts_deleted": 0,
"artifacts_files_deleted": 0,
"minio_objects_deleted": 0,
"training_logs_deleted": 0,
"performance_metrics_deleted": 0,
"storage_freed_bytes": 0,
"errors": []
}
@@ -336,51 +334,35 @@ async def delete_tenant_models_complete(
deletion_stats["errors"].append(error_msg)
logger.error(error_msg)
# Step 2: Delete model artifact files from storage
# Step 2: Delete model artifact files from MinIO storage
try:
artifacts_query = select(ModelArtifact).where(
ModelArtifact.tenant_id == tenant_uuid
)
artifacts_result = await db.execute(artifacts_query)
artifacts = artifacts_result.scalars().all()
storage_freed = 0
from shared.clients.minio_client import minio_client
bucket_name = settings.MINIO_MODEL_BUCKET
prefix = f"models/{tenant_id}/"
# List all objects for this tenant
objects_to_delete = minio_client.list_objects(bucket_name, prefix=prefix)
files_deleted = 0
for artifact in artifacts:
for obj_name in objects_to_delete:
try:
file_path = Path(artifact.file_path)
if file_path.exists():
file_size = file_path.stat().st_size
file_path.unlink() # Delete file
storage_freed += file_size
files_deleted += 1
logger.debug("Deleted artifact file",
file_path=str(file_path),
size_bytes=file_size)
# Also try to delete parent directories if empty
try:
if file_path.parent.exists() and not any(file_path.parent.iterdir()):
file_path.parent.rmdir()
except:
pass # Ignore errors cleaning up directories
minio_client.delete_object(bucket_name, obj_name)
files_deleted += 1
logger.debug("Deleted MinIO object", object_name=obj_name)
except Exception as e:
error_msg = f"Error deleting artifact file {artifact.file_path}: {str(e)}"
error_msg = f"Error deleting MinIO object {obj_name}: {str(e)}"
deletion_stats["errors"].append(error_msg)
logger.warning(error_msg)
deletion_stats["artifacts_files_deleted"] = files_deleted
deletion_stats["storage_freed_bytes"] = storage_freed
logger.info("Deleted artifact files",
deletion_stats["minio_objects_deleted"] = files_deleted
logger.info("Deleted MinIO objects",
tenant_id=tenant_id,
files_deleted=files_deleted,
storage_freed_mb=storage_freed / (1024 * 1024))
files_deleted=files_deleted)
except Exception as e:
error_msg = f"Error processing artifact files: {str(e)}"
error_msg = f"Error processing MinIO objects: {str(e)}"
deletion_stats["errors"].append(error_msg)
logger.error(error_msg)
@@ -463,19 +445,7 @@ async def delete_tenant_models_complete(
detail=error_msg
)
# Step 4: Clean up tenant model directory
try:
tenant_model_dir = Path(settings.MODEL_STORAGE_PATH) / tenant_id
if tenant_model_dir.exists():
shutil.rmtree(tenant_model_dir)
logger.info("Deleted tenant model directory",
directory=str(tenant_model_dir))
except Exception as e:
error_msg = f"Error deleting model directory: {str(e)}"
deletion_stats["errors"].append(error_msg)
logger.warning(error_msg)
# Models deleted successfully
# Step 4: Models deleted successfully (MinIO cleanup already done in Step 2)
return {
"success": True,
"message": f"All training data for tenant {tenant_id} deleted successfully",

View File

@@ -44,6 +44,18 @@ class TrainingSettings(BaseServiceSettings):
MODEL_BACKUP_ENABLED: bool = os.getenv("MODEL_BACKUP_ENABLED", "true").lower() == "true"
MODEL_VERSIONING_ENABLED: bool = os.getenv("MODEL_VERSIONING_ENABLED", "true").lower() == "true"
# MinIO Configuration
MINIO_ENDPOINT: str = os.getenv("MINIO_ENDPOINT", "minio.bakery-ia.svc.cluster.local:9000")
MINIO_ACCESS_KEY: str = os.getenv("MINIO_ACCESS_KEY", "training-service")
MINIO_SECRET_KEY: str = os.getenv("MINIO_SECRET_KEY", "training-secret-key")
MINIO_USE_SSL: bool = os.getenv("MINIO_USE_SSL", "true").lower() == "true"
MINIO_MODEL_BUCKET: str = os.getenv("MINIO_MODEL_BUCKET", "training-models")
MINIO_CONSOLE_PORT: str = os.getenv("MINIO_CONSOLE_PORT", "9001")
MINIO_API_PORT: str = os.getenv("MINIO_API_PORT", "9000")
MINIO_REGION: str = os.getenv("MINIO_REGION", "us-east-1")
MINIO_MODEL_LIFECYCLE_DAYS: int = int(os.getenv("MINIO_MODEL_LIFECYCLE_DAYS", "90"))
MINIO_CACHE_TTL_SECONDS: int = int(os.getenv("MINIO_CACHE_TTL_SECONDS", "3600"))
# Training Configuration
MAX_CONCURRENT_TRAINING_JOBS: int = int(os.getenv("MAX_CONCURRENT_TRAINING_JOBS", "3"))

View File

@@ -5,6 +5,7 @@ Combines Prophet's seasonality modeling with XGBoost's pattern learning
import pandas as pd
import numpy as np
import io
from typing import Dict, List, Any, Optional, Tuple
import structlog
from datetime import datetime, timezone
@@ -110,8 +111,8 @@ class HybridProphetXGBoost:
# Step 4: Get Prophet predictions on training data
logger.info("Step 3: Generating Prophet predictions for residual calculation")
train_prophet_pred = self._get_prophet_predictions(prophet_result, train_df)
val_prophet_pred = self._get_prophet_predictions(prophet_result, val_df)
train_prophet_pred = await self._get_prophet_predictions(prophet_result, train_df)
val_prophet_pred = await self._get_prophet_predictions(prophet_result, val_df)
# Step 5: Calculate residuals (actual - prophet_prediction)
train_residuals = train_df['y'].values - train_prophet_pred
@@ -207,7 +208,7 @@ class HybridProphetXGBoost:
return df_enhanced
def _get_prophet_predictions(
async def _get_prophet_predictions(
self,
prophet_result: Dict[str, Any],
df: pd.DataFrame
@@ -230,8 +231,13 @@ class HybridProphetXGBoost:
# Load the actual Prophet model from the stored path
try:
import joblib
prophet_model = joblib.load(model_path)
if model_path.startswith("minio://"):
# Use prophet_manager to load from MinIO
prophet_model = await self.prophet_manager._load_model_from_minio(model_path)
else:
# Fallback to direct loading for local paths
import joblib
prophet_model = joblib.load(model_path)
except Exception as e:
raise ValueError(f"Failed to load Prophet model from path {model_path}: {str(e)}")
@@ -417,8 +423,13 @@ class HybridProphetXGBoost:
# Load the Prophet model from the stored path
try:
import joblib
prophet_model = joblib.load(prophet_model_path)
if prophet_model_path.startswith("minio://"):
# Use prophet_manager to load from MinIO
prophet_model = await self.prophet_manager._load_model_from_minio(prophet_model_path)
else:
# Fallback to direct loading for local paths
import joblib
prophet_model = joblib.load(prophet_model_path)
except Exception as e:
raise ValueError(f"Failed to load Prophet model from path {prophet_model_path}: {str(e)}")

View File

@@ -13,6 +13,7 @@ from datetime import datetime, timedelta
import uuid
import os
import joblib
import io
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import TimeSeriesSplit
import json
@@ -85,9 +86,24 @@ class BakeryProphetManager:
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service")
self.db_session = None # Will be set when session is available
# Ensure model storage directory exists
os.makedirs(settings.MODEL_STORAGE_PATH, exist_ok=True)
# Initialize MinIO client and ensure bucket exists
from shared.clients.minio_client import minio_client
self.minio_client = minio_client
self._ensure_minio_bucket()
def _ensure_minio_bucket(self):
"""Ensure the training-models bucket exists in MinIO"""
try:
bucket_name = settings.MINIO_MODEL_BUCKET
if not self.minio_client.bucket_exists(bucket_name):
self.minio_client.create_bucket(bucket_name)
logger.info(f"Created MinIO bucket: {bucket_name}")
else:
logger.debug(f"MinIO bucket already exists: {bucket_name}")
except Exception as e:
logger.error(f"Failed to ensure MinIO bucket exists: {e}")
# Don't raise - bucket might be created by init job
async def train_bakery_model(self,
tenant_id: str,
inventory_product_id: str,
@@ -706,18 +722,40 @@ class BakeryProphetManager:
session = None) -> str:
"""Store model with database integration"""
# Create model directory
model_dir = Path(settings.MODEL_STORAGE_PATH) / tenant_id
model_dir.mkdir(parents=True, exist_ok=True)
# Store model in MinIO (clean implementation - MinIO only)
# Use BytesIO buffer since joblib.dump() writes to file-like objects
buffer = io.BytesIO()
joblib.dump(model, buffer)
model_data = buffer.getvalue()
object_name = f"models/{tenant_id}/{inventory_product_id}/{model_id}.pkl"
# Use MinIO client
from shared.clients.minio_client import minio_client
# Upload model to MinIO
success = minio_client.put_object(
bucket_name="training-models",
object_name=object_name,
data=model_data,
content_type="application/octet-stream",
metadata={
"model_id": model_id,
"tenant_id": tenant_id,
"inventory_product_id": inventory_product_id,
"model_type": "prophet_optimized"
}
)
if not success:
raise Exception("Failed to upload model to MinIO")
# Return MinIO object path
model_path = f"minio://training-models/{object_name}"
# Calculate checksum for model data
import hashlib
model_checksum = hashlib.sha256(model_data).hexdigest()
# Store model file
model_path = model_dir / f"{model_id}.pkl"
joblib.dump(model, model_path)
# Calculate checksum for model file integrity
checksummed_file = ChecksummedFile(str(model_path))
model_checksum = checksummed_file.calculate_and_save_checksum()
# Enhanced metadata with checksum
metadata = {
"model_id": model_id,
@@ -733,14 +771,23 @@ class BakeryProphetManager:
"optimized_parameters": optimized_params or {},
"created_at": datetime.now().isoformat(),
"model_type": "prophet_optimized",
"file_path": str(model_path),
"minio_path": model_path,
"checksum": model_checksum,
"checksum_algorithm": "sha256"
}
# Store metadata in MinIO as well
metadata_json = json.dumps(metadata, indent=2, default=str)
metadata_object_name = f"models/{tenant_id}/{inventory_product_id}/{model_id}.json"
minio_client.put_object(
bucket_name="training-models",
object_name=metadata_object_name,
data=metadata_json,
content_type="application/json"
)
metadata_path = model_path.with_suffix('.json')
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
# Define metadata_path for database record
metadata_path = f"minio://training-models/{metadata_object_name}"
# Store in memory
model_key = f"{tenant_id}:{inventory_product_id}"
@@ -854,16 +901,10 @@ class BakeryProphetManager:
model_path: str,
future_dates: pd.DataFrame,
regressor_columns: List[str]) -> pd.DataFrame:
"""Generate forecast using stored model with checksum verification"""
"""Generate forecast using stored model from MinIO"""
try:
# Verify model file integrity before loading
checksummed_file = ChecksummedFile(model_path)
if not checksummed_file.load_and_verify_checksum():
logger.warning(f"Checksum verification failed for model: {model_path}")
# Still load the model but log warning
# In production, you might want to raise an exception instead
model = joblib.load(model_path)
# Load model from MinIO
model = await self._load_model_from_minio(model_path)
for regressor in regressor_columns:
if regressor not in future_dates.columns:
@@ -876,6 +917,33 @@ class BakeryProphetManager:
except Exception as e:
logger.error(f"Failed to generate forecast: {str(e)}")
raise
async def _load_model_from_minio(self, model_path: str):
"""Load model from MinIO storage"""
try:
# Parse MinIO path: minio://bucket_name/object_path
if not model_path.startswith("minio://"):
raise ValueError(f"Invalid MinIO path: {model_path}")
_, bucket_and_path = model_path.split("://", 1)
bucket_name, object_name = bucket_and_path.split("/", 1)
logger.debug(f"Loading model from MinIO: {bucket_name}/{object_name}")
# Download model data from MinIO
model_data = self.minio_client.get_object(bucket_name, object_name)
if not model_data:
raise ValueError(f"Failed to download model from MinIO: {model_path}")
# Deserialize model (using BytesIO since joblib.load reads from file-like objects)
buffer = io.BytesIO(model_data)
model = joblib.load(buffer)
logger.info(f"Model loaded successfully from MinIO: {model_path}")
return model
except Exception as e:
logger.error(f"Failed to load model from MinIO: {model_path}, error: {e}")
raise
async def _validate_training_data(self, df: pd.DataFrame, inventory_product_id: str):
"""Validate training data quality (unchanged)"""

View File

@@ -17,6 +17,7 @@ scikit-learn==1.6.1
pandas==2.2.3
numpy==2.2.2
joblib==1.4.2
minio==7.2.2
xgboost==2.1.3
# HTTP client