Files
bakery-ia/services/training/app/ml/prophet_manager.py

1089 lines
53 KiB
Python
Raw Permalink Normal View History

2025-07-19 16:59:37 +02:00
# services/training/app/ml/prophet_manager.py
"""
2025-07-28 19:28:39 +02:00
Simplified Prophet Manager with Built-in Hyperparameter Optimization
Direct replacement for existing BakeryProphetManager - optimization always enabled.
2025-07-19 16:59:37 +02:00
"""
from typing import Dict, List, Any, Optional, Tuple
import pandas as pd
import numpy as np
from prophet import Prophet
import logging
from datetime import datetime, timedelta
import uuid
import os
import joblib
import io
2025-07-19 16:59:37 +02:00
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
2025-07-28 19:28:39 +02:00
from sklearn.model_selection import TimeSeriesSplit
2025-07-19 16:59:37 +02:00
import json
from pathlib import Path
2025-07-27 16:29:53 +02:00
import math
2025-07-28 19:28:39 +02:00
import warnings
2025-11-05 13:34:56 +01:00
import shutil
import errno
2025-07-28 19:28:39 +02:00
warnings.filterwarnings('ignore')
from sqlalchemy.ext.asyncio import AsyncSession
2025-07-29 12:45:39 +02:00
from sqlalchemy import text
2025-07-28 19:28:39 +02:00
from app.models.training import TrainedModel
2025-08-08 09:08:41 +02:00
from shared.database.base import create_database_manager
from app.repositories import ModelRepository
2025-07-28 19:28:39 +02:00
# Simple optimization import
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
2025-07-19 16:59:37 +02:00
from app.core.config import settings
from app.core import constants as const
2025-10-12 23:16:04 +02:00
from app.utils.ml_datetime import prepare_prophet_datetime
from app.utils.file_utils import ChecksummedFile, calculate_file_checksum
from app.utils.distributed_lock import get_training_lock, LockAcquisitionError
2025-07-19 16:59:37 +02:00
logger = logging.getLogger(__name__)
2025-11-05 13:34:56 +01:00
def check_disk_space(path='/tmp', min_free_gb=1.0):
"""
Check if there's enough disk space available.
Args:
path: Path to check disk space for
min_free_gb: Minimum required free space in GB
Returns:
tuple: (bool: has_space, float: free_gb, float: total_gb, float: used_percent)
"""
try:
stat = shutil.disk_usage(path)
total_gb = stat.total / (1024**3)
free_gb = stat.free / (1024**3)
used_gb = stat.used / (1024**3)
used_percent = (stat.used / stat.total) * 100
has_space = free_gb >= min_free_gb
logger.info(f"Disk space check for {path}: "
f"total={total_gb:.2f}GB, free={free_gb:.2f}GB, "
f"used={used_gb:.2f}GB ({used_percent:.1f}%)")
if used_percent > 85:
logger.warning(f"Disk usage is high: {used_percent:.1f}% - this may cause issues")
return has_space, free_gb, total_gb, used_percent
except Exception as e:
logger.error(f"Failed to check disk space: {e}")
return True, 0, 0, 0 # Assume OK if we can't check
2025-07-19 16:59:37 +02:00
class BakeryProphetManager:
"""
2025-07-28 19:28:39 +02:00
Simplified Prophet Manager with built-in hyperparameter optimization.
Drop-in replacement for the existing manager - optimization runs automatically.
2025-07-19 16:59:37 +02:00
"""
2025-08-08 09:08:41 +02:00
def __init__(self, database_manager=None):
2025-07-19 16:59:37 +02:00
self.models = {} # In-memory model storage
self.model_metadata = {} # Store model metadata
2025-08-08 09:08:41 +02:00
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service")
self.db_session = None # Will be set when session is available
2025-07-28 19:28:39 +02:00
# Initialize MinIO client and ensure bucket exists
from shared.clients.minio_client import minio_client
self.minio_client = minio_client
self._ensure_minio_bucket()
def _ensure_minio_bucket(self):
"""Ensure the training-models bucket exists in MinIO"""
try:
bucket_name = settings.MINIO_MODEL_BUCKET
if not self.minio_client.bucket_exists(bucket_name):
self.minio_client.create_bucket(bucket_name)
logger.info(f"Created MinIO bucket: {bucket_name}")
else:
logger.debug(f"MinIO bucket already exists: {bucket_name}")
except Exception as e:
logger.error(f"Failed to ensure MinIO bucket exists: {e}")
# Don't raise - bucket might be created by init job
async def train_bakery_model(self,
tenant_id: str,
inventory_product_id: str,
2025-07-19 16:59:37 +02:00
df: pd.DataFrame,
2025-11-05 13:34:56 +01:00
job_id: str,
product_category: 'ProductCategory' = None,
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
category_hyperparameters: Dict[str, Any] = None,
session = None) -> Dict[str, Any]:
2025-07-19 16:59:37 +02:00
"""
Train a Prophet model with automatic hyperparameter optimization and distributed locking.
2025-11-05 13:34:56 +01:00
Args:
tenant_id: Tenant identifier
inventory_product_id: Product identifier
df: Training data DataFrame
job_id: Training job identifier
product_category: Optional product category for category-specific settings
category_hyperparameters: Optional category-specific Prophet hyperparameters
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
session: Optional database session (uses parent session if provided to avoid nested sessions)
2025-07-19 16:59:37 +02:00
"""
2025-11-05 13:34:56 +01:00
# Check disk space before starting training
has_space, free_gb, total_gb, used_percent = check_disk_space('/tmp', min_free_gb=0.5)
if not has_space:
error_msg = f"Insufficient disk space: {free_gb:.2f}GB free ({used_percent:.1f}% used). Need at least 0.5GB free."
logger.error(error_msg)
raise RuntimeError(error_msg)
# Acquire distributed lock to prevent concurrent training of same product
lock = get_training_lock(tenant_id, inventory_product_id, use_advisory=True)
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
# Use provided session or create new one if not provided
use_parent_session = session is not None
async def _train_with_lock(db_session):
"""Inner function to perform training with lock"""
async with lock.acquire(db_session):
logger.info(f"Training optimized bakery model for {inventory_product_id} (lock acquired)")
# Validate input data
await self._validate_training_data(df, inventory_product_id)
# Prepare data for Prophet
prophet_data = await self._prepare_prophet_data(df)
# Get regressor columns
regressor_columns = self._extract_regressor_columns(prophet_data)
# Use category-specific hyperparameters if provided, otherwise optimize
if category_hyperparameters:
logger.info(f"Using category-specific hyperparameters for {inventory_product_id} (category: {product_category.value if product_category else 'unknown'})")
best_params = category_hyperparameters.copy()
use_optimized = False # Not optimized, but category-specific
else:
# Automatically optimize hyperparameters
logger.info(f"Optimizing hyperparameters for {inventory_product_id}...")
2025-11-05 13:34:56 +01:00
try:
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
best_params = await self._optimize_hyperparameters(prophet_data, inventory_product_id, regressor_columns)
use_optimized = True
except Exception as opt_error:
logger.warning(f"Hyperparameter optimization failed for {inventory_product_id}: {opt_error}")
logger.warning("Falling back to default Prophet parameters")
# Use conservative default parameters
best_params = {
'changepoint_prior_scale': 0.05,
'seasonality_prior_scale': 10.0,
'holidays_prior_scale': 10.0,
'changepoint_range': 0.8,
'seasonality_mode': 'additive',
'daily_seasonality': False,
'weekly_seasonality': True,
'yearly_seasonality': len(prophet_data) > 365,
'uncertainty_samples': 0 # Disable uncertainty sampling to avoid cmdstan
}
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
use_optimized = False
# Create optimized Prophet model
model = self._create_optimized_prophet_model(best_params, regressor_columns)
# Add regressors to model
for regressor in regressor_columns:
if regressor in prophet_data.columns:
model.add_regressor(regressor)
# Set environment variable for cmdstan tmp directory
import os
tmpdir = os.environ.get('TMPDIR', '/tmp/cmdstan')
os.makedirs(tmpdir, mode=0o777, exist_ok=True)
os.environ['TMPDIR'] = tmpdir
# Verify tmp directory is writable
test_file = os.path.join(tmpdir, f'test_write_{inventory_product_id}.tmp')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
logger.debug(f"Verified {tmpdir} is writable")
except Exception as e:
logger.error(f"TMPDIR {tmpdir} is not writable: {e}")
raise RuntimeError(f"Cannot write to {tmpdir}: {e}")
# Fit the model with enhanced error handling
try:
logger.info(f"Starting Prophet model fit for {inventory_product_id}")
# ✅ FIX: Run blocking model.fit() in thread pool to avoid blocking event loop
import asyncio
await asyncio.to_thread(model.fit, prophet_data)
logger.info(f"Prophet model fit completed successfully for {inventory_product_id}")
except Exception as fit_error:
error_details = {
'error_type': type(fit_error).__name__,
'error_message': str(fit_error),
'errno': getattr(fit_error, 'errno', None),
'tmpdir': tmpdir,
'disk_space': check_disk_space(tmpdir, 0)
}
logger.error(f"Prophet model fit failed for {inventory_product_id}: {error_details}")
raise RuntimeError(f"Prophet training failed: {error_details['error_message']}") from fit_error
# Calculate enhanced training metrics first
training_metrics = await self._calculate_training_metrics(model, prophet_data, best_params)
# Store model and metrics - Generate proper UUID for model_id
model_id = str(uuid.uuid4())
# ✅ FIX: Pass session to _store_model to avoid nested session
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
model_path = await self._store_model(
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
)
# Return same format as before, but with optimization info
# Ensure hyperparameters are JSON-serializable
def _serialize_hyperparameters(params):
"""Helper to ensure hyperparameters are JSON serializable"""
if not params:
return {}
safe_params = {}
for k, v in params.items():
try:
if isinstance(v, (int, float, str, bool, type(None))):
safe_params[k] = v
elif hasattr(v, 'item'): # numpy scalars
safe_params[k] = v.item()
elif isinstance(v, (list, tuple)):
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
else:
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
except:
safe_params[k] = str(v) # fallback to string conversion
return safe_params
model_info = {
"model_id": model_id,
"model_path": model_path,
"type": "prophet_optimized",
"training_samples": len(prophet_data),
"features": regressor_columns,
"hyperparameters": _serialize_hyperparameters(best_params),
"training_metrics": training_metrics,
"product_category": product_category.value if product_category else "unknown",
"trained_at": datetime.now().isoformat(),
"data_period": {
"start_date": pd.Timestamp(prophet_data['ds'].min()).isoformat(),
"end_date": pd.Timestamp(prophet_data['ds'].max()).isoformat(),
"total_days": len(prophet_data)
}
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
}
logger.info(f"Optimized model trained successfully for {inventory_product_id}. "
f"MAPE: {training_metrics.get('optimized_mape', 'N/A')}%")
return model_info
Fix training hang caused by nested database sessions and deadlocks Root Cause: The training process was hanging at the first progress update due to a nested database session issue. The main trainer created a session and repositories, then called prophet_manager.train_bakery_model() which created another nested session with an advisory lock. This caused a deadlock where: 1. Outer session had uncommitted UPDATE on model_training_logs 2. Inner session tried to acquire advisory lock 3. Neither could proceed, causing training to hang indefinitely Changes Made: 1. prophet_manager.py: - Added optional 'session' parameter to train_bakery_model() - Refactored to use parent session if provided, otherwise create new one - Prevents nested session creation during training 2. hybrid_trainer.py: - Added optional 'session' parameter to train_hybrid_model() - Passes session to prophet_manager to maintain single session context 3. trainer.py: - Updated _train_single_product() to accept and pass session - Updated _train_all_models_enhanced() to accept and pass session - Pass db_session from main training context to all training methods - Added explicit db_session.flush() after critical progress update - This ensures updates are visible before acquiring locks Impact: - Eliminates nested session deadlocks - Training now proceeds past initial progress update - Maintains single database session context throughout training - Prevents database transaction conflicts Related Issues: - Fixes training hang during onboarding process - Not directly related to audit_metadata changes but exposed by them 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 16:13:32 +01:00
try:
# ✅ FIX: Use parent session if provided, otherwise create new one
# This prevents nested session issues and database deadlocks
if use_parent_session:
logger.debug(f"Using parent session for training {inventory_product_id}")
return await _train_with_lock(session)
else:
logger.debug(f"Creating new session for training {inventory_product_id}")
async with self.database_manager.get_session() as new_session:
return await _train_with_lock(new_session)
except LockAcquisitionError as e:
logger.warning(f"Could not acquire lock for {inventory_product_id}: {e}")
raise RuntimeError(f"Training already in progress for product {inventory_product_id}")
2025-07-19 16:59:37 +02:00
except Exception as e:
2025-08-14 16:47:34 +02:00
logger.error(f"Failed to train optimized bakery model for {inventory_product_id}: {str(e)}")
2025-07-19 16:59:37 +02:00
raise
2025-07-28 19:28:39 +02:00
async def _optimize_hyperparameters(self,
df: pd.DataFrame,
2025-08-14 16:47:34 +02:00
inventory_product_id: str,
2025-07-28 19:28:39 +02:00
regressor_columns: List[str]) -> Dict[str, Any]:
"""
Automatically optimize Prophet hyperparameters using Bayesian optimization.
Simplified - no configuration needed.
"""
# Determine product category automatically
2025-08-14 16:47:34 +02:00
product_category = self._classify_product(inventory_product_id, df)
2025-07-28 19:28:39 +02:00
# Set optimization parameters based on category
n_trials = {
'high_volume': const.OPTUNA_TRIALS_HIGH_VOLUME,
'medium_volume': const.OPTUNA_TRIALS_MEDIUM_VOLUME,
'low_volume': const.OPTUNA_TRIALS_LOW_VOLUME,
'intermittent': const.OPTUNA_TRIALS_INTERMITTENT
}.get(product_category, const.OPTUNA_TRIALS_MEDIUM_VOLUME)
2025-07-28 19:28:39 +02:00
2025-08-14 16:47:34 +02:00
logger.info(f"Product {inventory_product_id} classified as {product_category}, using {n_trials} trials")
2025-07-28 19:28:39 +02:00
# Check data quality and adjust strategy
total_sales = df['y'].sum()
zero_ratio = (df['y'] == 0).sum() / len(df)
mean_sales = df['y'].mean()
non_zero_days = len(df[df['y'] > 0])
2025-08-14 16:47:34 +02:00
logger.info(f"Data analysis for {inventory_product_id}: total_sales={total_sales:.1f}, "
2025-07-28 19:28:39 +02:00
f"zero_ratio={zero_ratio:.2f}, mean_sales={mean_sales:.2f}, non_zero_days={non_zero_days}")
# Adjust strategy based on data characteristics
if zero_ratio > const.MAX_ZERO_RATIO_INTERMITTENT or non_zero_days < const.MIN_NON_ZERO_DAYS:
2025-08-14 16:47:34 +02:00
logger.warning(f"Very sparse data for {inventory_product_id}, using minimal optimization")
2025-07-28 19:28:39 +02:00
return {
'changepoint_prior_scale': 0.001,
'seasonality_prior_scale': 0.01,
'holidays_prior_scale': 0.01,
'changepoint_range': 0.8,
'seasonality_mode': 'additive',
'daily_seasonality': False,
'weekly_seasonality': True,
2025-08-10 18:32:47 +02:00
'yearly_seasonality': False,
'uncertainty_samples': const.UNCERTAINTY_SAMPLES_SPARSE_MIN
2025-07-28 19:28:39 +02:00
}
elif zero_ratio > const.MODERATE_SPARSITY_THRESHOLD:
2025-08-14 16:47:34 +02:00
logger.info(f"Moderate sparsity for {inventory_product_id}, using conservative optimization")
2025-07-28 19:28:39 +02:00
return {
'changepoint_prior_scale': 0.01,
'seasonality_prior_scale': 0.1,
'holidays_prior_scale': 0.1,
'changepoint_range': 0.8,
'seasonality_mode': 'additive',
'daily_seasonality': False,
'weekly_seasonality': True,
'yearly_seasonality': len(df) > const.DATA_QUALITY_DAY_THRESHOLD_HIGH,
'uncertainty_samples': const.UNCERTAINTY_SAMPLES_SPARSE_MAX
2025-07-28 19:28:39 +02:00
}
# Use unique seed for each product to avoid identical results
2025-08-14 16:47:34 +02:00
product_seed = hash(str(inventory_product_id)) % 10000
2025-07-28 19:28:39 +02:00
def objective(trial):
try:
# Sample hyperparameters with product-specific ranges
if product_category == 'high_volume':
# More conservative for high volume (less overfitting)
changepoint_scale_range = (0.001, 0.1)
seasonality_scale_range = (1.0, 10.0)
elif product_category == 'intermittent':
# Very conservative for intermittent
changepoint_scale_range = (0.001, 0.05)
seasonality_scale_range = (0.01, 1.0)
else:
# Default ranges
changepoint_scale_range = (0.001, 0.5)
seasonality_scale_range = (0.01, 10.0)
# Determine appropriate uncertainty samples range based on product category
2025-08-10 18:32:47 +02:00
if product_category == 'high_volume':
uncertainty_range = (const.UNCERTAINTY_SAMPLES_HIGH_MIN, const.UNCERTAINTY_SAMPLES_HIGH_MAX)
2025-08-10 18:32:47 +02:00
elif product_category == 'medium_volume':
uncertainty_range = (const.UNCERTAINTY_SAMPLES_MEDIUM_MIN, const.UNCERTAINTY_SAMPLES_MEDIUM_MAX)
2025-08-10 18:32:47 +02:00
elif product_category == 'low_volume':
uncertainty_range = (const.UNCERTAINTY_SAMPLES_LOW_MIN, const.UNCERTAINTY_SAMPLES_LOW_MAX)
2025-08-10 18:32:47 +02:00
else: # intermittent
uncertainty_range = (const.UNCERTAINTY_SAMPLES_SPARSE_MIN, const.UNCERTAINTY_SAMPLES_SPARSE_MAX)
2025-08-10 18:32:47 +02:00
2025-07-28 19:28:39 +02:00
params = {
'changepoint_prior_scale': trial.suggest_float(
'changepoint_prior_scale',
changepoint_scale_range[0],
changepoint_scale_range[1],
log=True
),
'seasonality_prior_scale': trial.suggest_float(
'seasonality_prior_scale',
seasonality_scale_range[0],
seasonality_scale_range[1],
log=True
),
'holidays_prior_scale': trial.suggest_float('holidays_prior_scale', 0.01, 10.0, log=True),
'changepoint_range': trial.suggest_float('changepoint_range', 0.8, 0.95),
'seasonality_mode': 'additive' if product_category == 'high_volume' else trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']),
'daily_seasonality': trial.suggest_categorical('daily_seasonality', [True, False]),
'weekly_seasonality': True, # Always keep weekly
2025-08-10 18:32:47 +02:00
'yearly_seasonality': trial.suggest_categorical('yearly_seasonality', [True, False]),
2025-11-05 13:34:56 +01:00
'uncertainty_samples': int(trial.suggest_int('uncertainty_samples', int(uncertainty_range[0]), int(uncertainty_range[1]))) # ✅ FIX: Explicit int casting for all values
2025-07-28 19:28:39 +02:00
}
# Simple 2-fold cross-validation for speed
tscv = TimeSeriesSplit(n_splits=2)
cv_scores = []
for train_idx, val_idx in tscv.split(df):
train_data = df.iloc[train_idx].copy()
val_data = df.iloc[val_idx].copy()
if len(val_data) < 7: # Need at least a week
continue
try:
2025-08-10 18:32:47 +02:00
# Create and train model with adaptive uncertainty sampling
2025-11-05 13:34:56 +01:00
uncertainty_samples = int(params.get('uncertainty_samples', 200)) # ✅ FIX: Explicit int casting to prevent type errors
# Set environment variable for cmdstan tmp directory
import os
tmpdir = os.environ.get('TMPDIR', '/tmp/cmdstan')
os.makedirs(tmpdir, mode=0o777, exist_ok=True)
os.environ['TMPDIR'] = tmpdir
model = Prophet(**{k: v for k, v in params.items() if k != 'uncertainty_samples'},
2025-08-10 18:32:47 +02:00
interval_width=0.8, uncertainty_samples=uncertainty_samples)
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
for regressor in regressor_columns:
if regressor in train_data.columns:
model.add_regressor(regressor)
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
with warnings.catch_warnings():
warnings.simplefilter("ignore")
2025-11-05 13:34:56 +01:00
try:
model.fit(train_data)
except OSError as e:
# Log errno for "Operation not permitted" errors
if e.errno == errno.EPERM:
logger.error(f"Permission denied during Prophet fit (errno={e.errno}): {e}")
logger.error(f"TMPDIR: {tmpdir}, exists: {os.path.exists(tmpdir)}, "
f"writable: {os.access(tmpdir, os.W_OK)}")
raise
2025-07-28 19:28:39 +02:00
# Predict on validation set
future_df = model.make_future_dataframe(periods=0)
for regressor in regressor_columns:
if regressor in df.columns:
future_df[regressor] = df[regressor].values[:len(future_df)]
forecast = model.predict(future_df)
val_predictions = forecast['yhat'].iloc[train_idx[-1]+1:train_idx[-1]+1+len(val_data)]
val_actual = val_data['y'].values
# Calculate MAPE with improved handling for low values
if len(val_predictions) > 0 and len(val_actual) > 0:
# Use MAE for very low sales values to avoid MAPE issues
if val_actual.mean() < 1:
mae = np.mean(np.abs(val_actual - val_predictions.values))
# Convert MAE to percentage-like metric
mape_like = (mae / max(val_actual.mean(), 0.1)) * 100
else:
non_zero_mask = val_actual > 0.1 # Use threshold instead of zero
if np.sum(non_zero_mask) > 0:
mape = np.mean(np.abs((val_actual[non_zero_mask] - val_predictions.values[non_zero_mask]) / val_actual[non_zero_mask])) * 100
mape_like = min(mape, 200) # Cap at 200%
else:
mape_like = 100
if not np.isnan(mape_like) and not np.isinf(mape_like):
cv_scores.append(mape_like)
except Exception as fold_error:
2025-08-14 16:47:34 +02:00
logger.debug(f"Fold failed for {inventory_product_id} trial {trial.number}: {str(fold_error)}")
2025-07-28 19:28:39 +02:00
continue
return np.mean(cv_scores) if len(cv_scores) > 0 else 100.0
except Exception as trial_error:
2025-08-14 16:47:34 +02:00
logger.debug(f"Trial {trial.number} failed for {inventory_product_id}: {str(trial_error)}")
2025-07-28 19:28:39 +02:00
return 100.0
# Run optimization with product-specific seed
study = optuna.create_study(
direction='minimize',
sampler=optuna.samplers.TPESampler(seed=product_seed)
2025-07-28 19:28:39 +02:00
)
# ✅ FIX: Run blocking study.optimize() in thread pool to avoid blocking event loop
import asyncio
await asyncio.to_thread(
study.optimize,
objective,
n_trials=n_trials,
timeout=const.OPTUNA_TIMEOUT_SECONDS,
show_progress_bar=False
)
2025-07-28 19:28:39 +02:00
# Return best parameters
best_params = study.best_params
best_score = study.best_value
2025-08-14 16:47:34 +02:00
logger.info(f"Optimization completed for {inventory_product_id}. Best score: {best_score:.2f}%. "
2025-07-28 19:28:39 +02:00
f"Parameters: {best_params}")
2025-11-05 13:34:56 +01:00
# ✅ FIX: Log uncertainty sampling configuration for debugging confidence intervals with explicit int casting
uncertainty_samples = int(best_params.get('uncertainty_samples', 500))
2025-08-14 16:47:34 +02:00
logger.info(f"Prophet model will use {uncertainty_samples} uncertainty samples for {inventory_product_id} "
2025-08-10 18:32:47 +02:00
f"(category: {product_category}, zero_ratio: {zero_ratio:.2f})")
2025-07-28 19:28:39 +02:00
return best_params
2025-08-14 16:47:34 +02:00
def _classify_product(self, inventory_product_id: str, sales_data: pd.DataFrame) -> str:
2025-07-28 19:28:39 +02:00
"""Automatically classify product for optimization strategy - improved for bakery data"""
2025-08-14 16:47:34 +02:00
product_lower = str(inventory_product_id).lower()
2025-07-28 19:28:39 +02:00
# Calculate sales statistics
total_sales = sales_data['y'].sum()
mean_sales = sales_data['y'].mean()
zero_ratio = (sales_data['y'] == 0).sum() / len(sales_data)
non_zero_days = len(sales_data[sales_data['y'] > 0])
2025-08-14 16:47:34 +02:00
logger.info(f"Product classification for {inventory_product_id}: total_sales={total_sales:.1f}, "
2025-07-28 19:28:39 +02:00
f"mean_sales={mean_sales:.2f}, zero_ratio={zero_ratio:.2f}, non_zero_days={non_zero_days}")
# Improved classification logic for bakery products
# Consider both volume and consistency
# Check for truly intermittent demand (high zero ratio)
if zero_ratio > 0.8 or non_zero_days < 30:
return 'intermittent'
# High volume products (consistent daily sales)
if any(pattern in product_lower for pattern in ['cafe', 'pan', 'bread', 'coffee']):
# Even if absolute volume is low, these are core products
return 'high_volume' if zero_ratio < 0.3 else 'medium_volume'
# Volume-based classification for other products
if mean_sales >= 10 and zero_ratio < 0.4:
return 'high_volume'
elif mean_sales >= 5 and zero_ratio < 0.6:
return 'medium_volume'
elif mean_sales >= 2 and zero_ratio < 0.7:
return 'low_volume'
else:
return 'intermittent'
def _create_optimized_prophet_model(self, optimized_params: Dict[str, Any], regressor_columns: List[str]) -> Prophet:
2025-08-10 18:32:47 +02:00
"""Create Prophet model with optimized parameters and adaptive uncertainty sampling"""
2025-07-28 19:28:39 +02:00
holidays = self._get_spanish_holidays()
2025-11-05 13:34:56 +01:00
# Determine uncertainty samples based on data characteristics with explicit int casting
uncertainty_samples = int(optimized_params.get('uncertainty_samples', 500)) if optimized_params.get('uncertainty_samples') is not None else 500
# If uncertainty_samples is 0, we're in fallback mode (no cmdstan)
if uncertainty_samples == 0:
logger.info("Creating Prophet model without uncertainty sampling (fallback mode)")
model = Prophet(
holidays=holidays if not holidays.empty else None,
daily_seasonality=optimized_params.get('daily_seasonality', True),
weekly_seasonality=optimized_params.get('weekly_seasonality', True),
yearly_seasonality=optimized_params.get('yearly_seasonality', True),
seasonality_mode=optimized_params.get('seasonality_mode', 'additive'),
changepoint_prior_scale=float(optimized_params.get('changepoint_prior_scale', 0.05)),
seasonality_prior_scale=float(optimized_params.get('seasonality_prior_scale', 10.0)),
holidays_prior_scale=float(optimized_params.get('holidays_prior_scale', 10.0)),
changepoint_range=float(optimized_params.get('changepoint_range', 0.8)),
interval_width=0.8,
mcmc_samples=0,
uncertainty_samples=1 # Minimum value to avoid errors
)
else:
model = Prophet(
holidays=holidays if not holidays.empty else None,
daily_seasonality=optimized_params.get('daily_seasonality', True),
weekly_seasonality=optimized_params.get('weekly_seasonality', True),
yearly_seasonality=optimized_params.get('yearly_seasonality', True),
seasonality_mode=optimized_params.get('seasonality_mode', 'additive'),
changepoint_prior_scale=float(optimized_params.get('changepoint_prior_scale', 0.05)),
seasonality_prior_scale=float(optimized_params.get('seasonality_prior_scale', 10.0)),
holidays_prior_scale=float(optimized_params.get('holidays_prior_scale', 10.0)),
changepoint_range=float(optimized_params.get('changepoint_range', 0.8)),
interval_width=0.8,
mcmc_samples=0,
uncertainty_samples=uncertainty_samples
)
2025-07-28 19:28:39 +02:00
return model
# All the existing methods remain the same, just with enhanced metrics
async def _calculate_training_metrics(self,
model: Prophet,
training_data: pd.DataFrame,
optimized_params: Dict[str, Any] = None) -> Dict[str, float]:
"""Calculate training metrics with optimization info and improved MAPE handling"""
try:
# Generate in-sample predictions
forecast = model.predict(training_data[['ds'] + [col for col in training_data.columns if col not in ['ds', 'y']]])
# Calculate metrics
y_true = training_data['y'].values
y_pred = forecast['yhat'].values
# Basic metrics
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
# Improved MAPE calculation for bakery data
mean_actual = y_true.mean()
median_actual = np.median(y_true[y_true > 0]) if np.any(y_true > 0) else 1.0
# Use different strategies based on sales volume
if mean_actual < 2.0:
# For very low volume products, use normalized MAE
normalized_mae = mae / max(median_actual, 1.0)
mape = min(normalized_mae * 100, 200) # Cap at 200%
logger.info(f"Using normalized MAE for low-volume product (mean={mean_actual:.2f})")
elif mean_actual < 5.0:
# For low-medium volume, use modified MAPE with higher threshold
threshold = 1.0
valid_mask = y_true >= threshold
if np.sum(valid_mask) == 0:
mape = 150.0 # High but not extreme
else:
mape_values = np.abs((y_true[valid_mask] - y_pred[valid_mask]) / y_true[valid_mask])
mape = np.median(mape_values) * 100 # Use median instead of mean to reduce outlier impact
mape = min(mape, 150) # Cap at reasonable level
else:
# Standard MAPE for higher volume products
threshold = 0.5
valid_mask = y_true > threshold
if np.sum(valid_mask) == 0:
mape = 100.0
else:
mape_values = np.abs((y_true[valid_mask] - y_pred[valid_mask]) / y_true[valid_mask])
mape = np.mean(mape_values) * 100
# Cap MAPE at reasonable maximum
if math.isinf(mape) or math.isnan(mape) or mape > 200:
mape = min(200.0, (mae / max(mean_actual, 1.0)) * 100)
# R-squared
ss_res = np.sum((y_true - y_pred) ** 2)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0
# Calculate realistic improvement estimate based on actual product performance
# Use more granular categories and realistic baselines
total_sales = training_data['y'].sum()
zero_ratio = (training_data['y'] == 0).sum() / len(training_data)
mean_sales = training_data['y'].mean()
non_zero_days = len(training_data[training_data['y'] > 0])
# More nuanced categorization
if zero_ratio > 0.8 or non_zero_days < 30:
category = 'very_sparse'
baseline_mape = 80.0
elif zero_ratio > 0.6:
category = 'sparse'
baseline_mape = 60.0
elif mean_sales >= 10 and zero_ratio < 0.3:
category = 'high_volume'
baseline_mape = 25.0
elif mean_sales >= 5 and zero_ratio < 0.5:
category = 'medium_volume'
baseline_mape = 35.0
else:
category = 'low_volume'
baseline_mape = 45.0
# Calculate improvement - be more conservative
if mape < baseline_mape * 0.8: # Only claim improvement if significant
improvement_pct = (baseline_mape - mape) / baseline_mape * 100
else:
improvement_pct = 0 # No meaningful improvement
# Quality score based on data characteristics
quality_score = max(0.1, min(1.0, (1 - zero_ratio) * (non_zero_days / len(training_data))))
# Enhanced metrics with optimization info
metrics = {
"mae": round(mae, 2),
"mse": round(mse, 2),
"rmse": round(rmse, 2),
"mape": round(mape, 2),
"r2": round(r2, 3),
"optimized": True,
"optimized_mape": round(mape, 2),
"baseline_mape_estimate": round(baseline_mape, 2),
"improvement_estimated": round(improvement_pct, 1),
"product_category": category,
"data_quality_score": round(quality_score, 2),
"mean_sales_volume": round(mean_sales, 2),
"sales_consistency": round(non_zero_days / len(training_data), 2),
"total_demand": round(total_sales, 1)
}
logger.info(f"Training metrics calculated: MAPE={mape:.1f}%, "
f"Category={category}, Improvement={improvement_pct:.1f}%")
return metrics
except Exception as e:
logger.error(f"Error calculating training metrics: {str(e)}")
return {
"mae": 0.0, "mse": 0.0, "rmse": 0.0, "mape": 100.0, "r2": 0.0,
"optimized": False, "improvement_estimated": 0.0
}
async def _store_model(self,
tenant_id: str,
inventory_product_id: str,
model: Prophet,
2025-07-28 19:28:39 +02:00
model_id: str,
training_data: pd.DataFrame,
regressor_columns: List[str],
optimized_params: Dict[str, Any] = None,
training_metrics: Dict[str, Any] = None,
session = None) -> str:
2025-07-28 19:28:39 +02:00
"""Store model with database integration"""
# Store model in MinIO (clean implementation - MinIO only)
# Use BytesIO buffer since joblib.dump() writes to file-like objects
buffer = io.BytesIO()
joblib.dump(model, buffer)
model_data = buffer.getvalue()
object_name = f"models/{tenant_id}/{inventory_product_id}/{model_id}.pkl"
# Use MinIO client
from shared.clients.minio_client import minio_client
# Upload model to MinIO
success = minio_client.put_object(
bucket_name="training-models",
object_name=object_name,
data=model_data,
content_type="application/octet-stream",
metadata={
"model_id": model_id,
"tenant_id": tenant_id,
"inventory_product_id": inventory_product_id,
"model_type": "prophet_optimized"
}
)
if not success:
raise Exception("Failed to upload model to MinIO")
# Return MinIO object path
model_path = f"minio://training-models/{object_name}"
# Calculate checksum for model data
import hashlib
model_checksum = hashlib.sha256(model_data).hexdigest()
2025-07-28 19:28:39 +02:00
# Enhanced metadata with checksum
2025-07-28 19:28:39 +02:00
metadata = {
"model_id": model_id,
"tenant_id": tenant_id,
2025-08-14 16:47:34 +02:00
"inventory_product_id": inventory_product_id,
2025-07-28 19:28:39 +02:00
"regressor_columns": regressor_columns,
"training_samples": len(training_data),
"data_period": {
2025-11-05 13:34:56 +01:00
"start_date": pd.Timestamp(training_data['ds'].min()).isoformat(),
"end_date": pd.Timestamp(training_data['ds'].max()).isoformat()
2025-07-28 19:28:39 +02:00
},
"optimized": True,
"optimized_parameters": optimized_params or {},
"created_at": datetime.now().isoformat(),
"model_type": "prophet_optimized",
"minio_path": model_path,
"checksum": model_checksum,
"checksum_algorithm": "sha256"
2025-07-28 19:28:39 +02:00
}
# Store metadata in MinIO as well
metadata_json = json.dumps(metadata, indent=2, default=str)
metadata_object_name = f"models/{tenant_id}/{inventory_product_id}/{model_id}.json"
minio_client.put_object(
bucket_name="training-models",
object_name=metadata_object_name,
data=metadata_json,
content_type="application/json"
)
# Define metadata_path for database record
metadata_path = f"minio://training-models/{metadata_object_name}"
2025-07-28 19:28:39 +02:00
# Store in memory
2025-08-14 16:47:34 +02:00
model_key = f"{tenant_id}:{inventory_product_id}"
2025-07-28 19:28:39 +02:00
self.models[model_key] = model
self.model_metadata[model_key] = metadata
# 🆕 NEW: Store in database using session (parent or new)
use_parent_session = session is not None
async def _store_in_db(db_session):
"""Inner function to store model in database"""
# Deactivate previous models for this product
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
# Helper to ensure hyperparameters are JSON serializable
def _serialize_hyperparameters(params):
if not params:
return {}
safe_params = {}
for k, v in params.items():
try:
if isinstance(v, (int, float, str, bool, type(None))):
safe_params[k] = v
elif hasattr(v, 'item'): # numpy scalars
safe_params[k] = v.item()
elif isinstance(v, (list, tuple)):
safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v]
else:
safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v)
except:
safe_params[k] = str(v) # fallback to string conversion
return safe_params
# Create new database record
db_model = TrainedModel(
id=model_id,
tenant_id=tenant_id,
inventory_product_id=inventory_product_id,
model_type="prophet_optimized",
job_id=model_id.split('_')[0], # Extract job_id from model_id
model_path=str(model_path),
metadata_path=str(metadata_path),
hyperparameters=_serialize_hyperparameters(optimized_params or {}),
features_used=[str(f) for f in regressor_columns] if regressor_columns else [],
is_active=True,
is_production=True, # New models are production-ready
training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None),
training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None),
training_samples=len(training_data)
)
# Add training metrics if available
if training_metrics:
db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None
db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None
db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None
db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None
db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None
db_session.add(db_model)
# ✅ FIX: Don't commit here - let the outer scope handle commits
# This prevents double commit when using a new dedicated session
# Parent sessions will commit when parent completes; new sessions commit in outer scope
logger.debug(f"Added model {model_id} to session (commit handled by caller)")
2025-08-08 09:08:41 +02:00
try:
# ✅ FIX: Use parent session if provided, otherwise create new one
2025-11-05 19:26:52 +01:00
# The parent session handles commits, so child sessions shouldn't commit
if use_parent_session:
logger.debug(f"Using parent session for storing model {model_id}")
await _store_in_db(session)
else:
logger.debug(f"Creating new session for storing model {model_id}")
async with self.database_manager.get_session() as new_session:
await _store_in_db(new_session)
2025-11-05 19:26:52 +01:00
# Commit only when using our own session (not parent)
await new_session.commit() # This is safe since new_session is dedicated
logger.info(f"Model {model_id} stored in database successfully")
2025-11-05 13:34:56 +01:00
2025-08-08 09:08:41 +02:00
except Exception as e:
logger.error(f"Failed to store model in database: {str(e)}")
# Continue execution - file storage succeeded
2025-07-28 19:28:39 +02:00
logger.info(f"Optimized model stored at: {model_path}")
return str(model_path)
2025-08-14 16:47:34 +02:00
async def _deactivate_previous_models_with_session(self, db_session, tenant_id: str, inventory_product_id: str):
2025-08-08 09:08:41 +02:00
"""Deactivate previous models for the same product using provided session"""
try:
# ✅ FIX: Wrap SQL string with text() for SQLAlchemy 2.0
query = text("""
UPDATE trained_models
SET is_active = false, is_production = false
2025-08-14 16:47:34 +02:00
WHERE tenant_id = :tenant_id AND inventory_product_id = :inventory_product_id
2025-08-08 09:08:41 +02:00
""")
await db_session.execute(query, {
"tenant_id": tenant_id,
2025-08-14 16:47:34 +02:00
"inventory_product_id": inventory_product_id
2025-08-08 09:08:41 +02:00
})
# Note: Don't commit here, let the calling method handle the transaction
2025-08-14 16:47:34 +02:00
logger.info(f"Successfully deactivated previous models for {inventory_product_id}")
2025-08-08 09:08:41 +02:00
except Exception as e:
logger.error(f"Failed to deactivate previous models: {str(e)}")
raise
2025-07-28 19:28:39 +02:00
async def generate_forecast(self,
2025-07-19 16:59:37 +02:00
model_path: str,
future_dates: pd.DataFrame,
regressor_columns: List[str]) -> pd.DataFrame:
"""Generate forecast using stored model from MinIO"""
2025-07-19 16:59:37 +02:00
try:
# Load model from MinIO
model = await self._load_model_from_minio(model_path)
2025-07-19 16:59:37 +02:00
for regressor in regressor_columns:
if regressor not in future_dates.columns:
logger.warning(f"Missing regressor {regressor}, filling with median")
2025-07-28 19:28:39 +02:00
future_dates[regressor] = 0
2025-07-19 16:59:37 +02:00
forecast = model.predict(future_dates)
return forecast
2025-07-19 16:59:37 +02:00
except Exception as e:
logger.error(f"Failed to generate forecast: {str(e)}")
raise
async def _load_model_from_minio(self, model_path: str):
"""Load model from MinIO storage"""
try:
# Parse MinIO path: minio://bucket_name/object_path
if not model_path.startswith("minio://"):
raise ValueError(f"Invalid MinIO path: {model_path}")
_, bucket_and_path = model_path.split("://", 1)
bucket_name, object_name = bucket_and_path.split("/", 1)
logger.debug(f"Loading model from MinIO: {bucket_name}/{object_name}")
# Download model data from MinIO
model_data = self.minio_client.get_object(bucket_name, object_name)
if not model_data:
raise ValueError(f"Failed to download model from MinIO: {model_path}")
# Deserialize model (using BytesIO since joblib.load reads from file-like objects)
buffer = io.BytesIO(model_data)
model = joblib.load(buffer)
logger.info(f"Model loaded successfully from MinIO: {model_path}")
return model
except Exception as e:
logger.error(f"Failed to load model from MinIO: {model_path}, error: {e}")
raise
2025-07-19 16:59:37 +02:00
2025-08-14 16:47:34 +02:00
async def _validate_training_data(self, df: pd.DataFrame, inventory_product_id: str):
2025-07-28 19:28:39 +02:00
"""Validate training data quality (unchanged)"""
2025-07-19 16:59:37 +02:00
if df.empty:
2025-08-14 16:47:34 +02:00
raise ValueError(f"No training data available for {inventory_product_id}")
2025-07-19 16:59:37 +02:00
if len(df) < settings.MIN_TRAINING_DATA_DAYS:
raise ValueError(
2025-08-14 16:47:34 +02:00
f"Insufficient training data for {inventory_product_id}: "
2025-07-19 16:59:37 +02:00
f"{len(df)} days, minimum required: {settings.MIN_TRAINING_DATA_DAYS}"
)
required_columns = ['ds', 'y']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
if df['ds'].isna().any():
raise ValueError("Invalid dates found in training data")
if df['y'].isna().all():
raise ValueError("No valid target values found")
async def _prepare_prophet_data(self, df: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Prepare data for Prophet training with timezone handling"""
2025-07-19 16:59:37 +02:00
prophet_data = df.copy()
2025-07-28 19:28:39 +02:00
if 'ds' not in prophet_data.columns:
raise ValueError("Missing 'ds' column in training data")
if 'y' not in prophet_data.columns:
raise ValueError("Missing 'y' column in training data")
# Use timezone utility to prepare Prophet-compatible datetime
prophet_data = prepare_prophet_datetime(prophet_data, 'ds')
2025-07-28 19:28:39 +02:00
# Sort by date and clean data
2025-07-19 16:59:37 +02:00
prophet_data = prophet_data.sort_values('ds').reset_index(drop=True)
2025-07-28 19:28:39 +02:00
prophet_data['y'] = pd.to_numeric(prophet_data['y'], errors='coerce')
prophet_data = prophet_data.dropna(subset=['y'])
2025-07-28 19:28:39 +02:00
# Remove any duplicate dates (keep last occurrence)
prophet_data = prophet_data.drop_duplicates(subset=['ds'], keep='last')
# Ensure y values are non-negative
2025-07-28 19:28:39 +02:00
prophet_data['y'] = prophet_data['y'].clip(lower=0)
2025-11-05 13:34:56 +01:00
logger.info(f"Prepared Prophet data: {len(prophet_data)} rows, date range: {pd.Timestamp(prophet_data['ds'].min())} to {pd.Timestamp(prophet_data['ds'].max())}")
2025-07-19 16:59:37 +02:00
return prophet_data
def _extract_regressor_columns(self, df: pd.DataFrame) -> List[str]:
2025-07-28 19:28:39 +02:00
"""Extract regressor columns (unchanged)"""
2025-07-19 16:59:37 +02:00
excluded_columns = ['ds', 'y']
regressor_columns = []
for col in df.columns:
if col not in excluded_columns and df[col].dtype in ['int64', 'float64']:
regressor_columns.append(col)
logger.info(f"Identified regressor columns: {regressor_columns}")
return regressor_columns
2025-11-05 13:34:56 +01:00
def _get_spanish_holidays(self, region: str = None) -> pd.DataFrame:
"""
Get Spanish holidays dynamically using holidays library.
Supports national and regional holidays, including dynamic Easter calculation.
Args:
region: Region code (e.g., 'MD' for Madrid, 'PV' for Basque Country)
Returns:
DataFrame with holiday dates and names
"""
2025-07-19 16:59:37 +02:00
try:
2025-11-05 13:34:56 +01:00
import holidays
2025-07-19 16:59:37 +02:00
holidays_list = []
2025-11-05 13:34:56 +01:00
years = range(2020, 2035) # Extended range for better coverage
# Get Spanish holidays for each year
for year in years:
# National holidays
spain_holidays = holidays.Spain(years=year, prov=region)
for date, name in spain_holidays.items():
holidays_list.append({
'holiday': self._normalize_holiday_name(name),
'ds': pd.Timestamp(date),
'lower_window': 0,
'upper_window': 0 # Can be adjusted for multi-day holidays
})
if holidays_list:
holidays_df = pd.DataFrame(holidays_list)
# Remove duplicates (some holidays may repeat)
holidays_df = holidays_df.drop_duplicates(subset=['ds', 'holiday'])
holidays_df = holidays_df.sort_values('ds').reset_index(drop=True)
# ✅ FIX: Don't pass keyword args to logger - use f-string instead
logger.info(f"Loaded {len(holidays_df)} Spanish holidays dynamically (region={region or 'National'}, years={min(years)}-{max(years)})")
2025-11-05 13:34:56 +01:00
return holidays_df
else:
return pd.DataFrame()
except Exception as e:
logger.warning(f"Could not load Spanish holidays dynamically: {str(e)}")
# Fallback to minimal hardcoded holidays
return self._get_fallback_holidays()
def _normalize_holiday_name(self, name: str) -> str:
"""Normalize holiday name to a consistent format for Prophet"""
# Convert to lowercase and replace spaces with underscores
normalized = name.lower().replace(' ', '_').replace("'", '')
# Remove special characters
normalized = ''.join(c for c in normalized if c.isalnum() or c == '_')
return normalized
def _get_fallback_holidays(self) -> pd.DataFrame:
"""Fallback to basic hardcoded holidays if dynamic loading fails"""
try:
holidays_list = []
years = range(2020, 2035)
2025-07-19 16:59:37 +02:00
for year in years:
holidays_list.extend([
{'holiday': 'new_year', 'ds': f'{year}-01-01'},
{'holiday': 'epiphany', 'ds': f'{year}-01-06'},
2025-07-28 19:28:39 +02:00
{'holiday': 'labor_day', 'ds': f'{year}-05-01'},
2025-07-19 16:59:37 +02:00
{'holiday': 'assumption', 'ds': f'{year}-08-15'},
{'holiday': 'national_day', 'ds': f'{year}-10-12'},
{'holiday': 'all_saints', 'ds': f'{year}-11-01'},
2025-07-28 19:28:39 +02:00
{'holiday': 'constitution_day', 'ds': f'{year}-12-06'},
{'holiday': 'immaculate_conception', 'ds': f'{year}-12-08'},
{'holiday': 'christmas', 'ds': f'{year}-12-25'}
2025-07-19 16:59:37 +02:00
])
2025-11-05 13:34:56 +01:00
holidays_df = pd.DataFrame(holidays_list)
holidays_df['ds'] = pd.to_datetime(holidays_df['ds'])
return holidays_df
2025-07-19 16:59:37 +02:00
except Exception as e:
2025-11-05 13:34:56 +01:00
logger.error(f"Fallback holidays failed: {e}")
2025-07-28 19:28:39 +02:00
return pd.DataFrame()