# services/training/app/ml/prophet_manager.py """ Simplified Prophet Manager with Built-in Hyperparameter Optimization Direct replacement for existing BakeryProphetManager - optimization always enabled. """ from typing import Dict, List, Any, Optional, Tuple import pandas as pd import numpy as np from prophet import Prophet import logging from datetime import datetime, timedelta import uuid import os import joblib from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import TimeSeriesSplit import json from pathlib import Path import math import warnings import shutil import errno warnings.filterwarnings('ignore') from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from app.models.training import TrainedModel from shared.database.base import create_database_manager from app.repositories import ModelRepository # Simple optimization import import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) from app.core.config import settings from app.core import constants as const from app.utils.ml_datetime import prepare_prophet_datetime from app.utils.file_utils import ChecksummedFile, calculate_file_checksum from app.utils.distributed_lock import get_training_lock, LockAcquisitionError logger = logging.getLogger(__name__) def check_disk_space(path='/tmp', min_free_gb=1.0): """ Check if there's enough disk space available. Args: path: Path to check disk space for min_free_gb: Minimum required free space in GB Returns: tuple: (bool: has_space, float: free_gb, float: total_gb, float: used_percent) """ try: stat = shutil.disk_usage(path) total_gb = stat.total / (1024**3) free_gb = stat.free / (1024**3) used_gb = stat.used / (1024**3) used_percent = (stat.used / stat.total) * 100 has_space = free_gb >= min_free_gb logger.info(f"Disk space check for {path}: " f"total={total_gb:.2f}GB, free={free_gb:.2f}GB, " f"used={used_gb:.2f}GB ({used_percent:.1f}%)") if used_percent > 85: logger.warning(f"Disk usage is high: {used_percent:.1f}% - this may cause issues") return has_space, free_gb, total_gb, used_percent except Exception as e: logger.error(f"Failed to check disk space: {e}") return True, 0, 0, 0 # Assume OK if we can't check class BakeryProphetManager: """ Simplified Prophet Manager with built-in hyperparameter optimization. Drop-in replacement for the existing manager - optimization runs automatically. """ def __init__(self, database_manager=None): self.models = {} # In-memory model storage self.model_metadata = {} # Store model metadata self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service") self.db_session = None # Will be set when session is available # Ensure model storage directory exists os.makedirs(settings.MODEL_STORAGE_PATH, exist_ok=True) async def train_bakery_model(self, tenant_id: str, inventory_product_id: str, df: pd.DataFrame, job_id: str, product_category: 'ProductCategory' = None, category_hyperparameters: Dict[str, Any] = None, session = None) -> Dict[str, Any]: """ Train a Prophet model with automatic hyperparameter optimization and distributed locking. Args: tenant_id: Tenant identifier inventory_product_id: Product identifier df: Training data DataFrame job_id: Training job identifier product_category: Optional product category for category-specific settings category_hyperparameters: Optional category-specific Prophet hyperparameters session: Optional database session (uses parent session if provided to avoid nested sessions) """ # Check disk space before starting training has_space, free_gb, total_gb, used_percent = check_disk_space('/tmp', min_free_gb=0.5) if not has_space: error_msg = f"Insufficient disk space: {free_gb:.2f}GB free ({used_percent:.1f}% used). Need at least 0.5GB free." logger.error(error_msg) raise RuntimeError(error_msg) # Acquire distributed lock to prevent concurrent training of same product lock = get_training_lock(tenant_id, inventory_product_id, use_advisory=True) # Use provided session or create new one if not provided use_parent_session = session is not None async def _train_with_lock(db_session): """Inner function to perform training with lock""" async with lock.acquire(db_session): logger.info(f"Training optimized bakery model for {inventory_product_id} (lock acquired)") # Validate input data await self._validate_training_data(df, inventory_product_id) # Prepare data for Prophet prophet_data = await self._prepare_prophet_data(df) # Get regressor columns regressor_columns = self._extract_regressor_columns(prophet_data) # Use category-specific hyperparameters if provided, otherwise optimize if category_hyperparameters: logger.info(f"Using category-specific hyperparameters for {inventory_product_id} (category: {product_category.value if product_category else 'unknown'})") best_params = category_hyperparameters.copy() use_optimized = False # Not optimized, but category-specific else: # Automatically optimize hyperparameters logger.info(f"Optimizing hyperparameters for {inventory_product_id}...") try: best_params = await self._optimize_hyperparameters(prophet_data, inventory_product_id, regressor_columns) use_optimized = True except Exception as opt_error: logger.warning(f"Hyperparameter optimization failed for {inventory_product_id}: {opt_error}") logger.warning("Falling back to default Prophet parameters") # Use conservative default parameters best_params = { 'changepoint_prior_scale': 0.05, 'seasonality_prior_scale': 10.0, 'holidays_prior_scale': 10.0, 'changepoint_range': 0.8, 'seasonality_mode': 'additive', 'daily_seasonality': False, 'weekly_seasonality': True, 'yearly_seasonality': len(prophet_data) > 365, 'uncertainty_samples': 0 # Disable uncertainty sampling to avoid cmdstan } use_optimized = False # Create optimized Prophet model model = self._create_optimized_prophet_model(best_params, regressor_columns) # Add regressors to model for regressor in regressor_columns: if regressor in prophet_data.columns: model.add_regressor(regressor) # Set environment variable for cmdstan tmp directory import os tmpdir = os.environ.get('TMPDIR', '/tmp/cmdstan') os.makedirs(tmpdir, mode=0o777, exist_ok=True) os.environ['TMPDIR'] = tmpdir # Verify tmp directory is writable test_file = os.path.join(tmpdir, f'test_write_{inventory_product_id}.tmp') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) logger.debug(f"Verified {tmpdir} is writable") except Exception as e: logger.error(f"TMPDIR {tmpdir} is not writable: {e}") raise RuntimeError(f"Cannot write to {tmpdir}: {e}") # Fit the model with enhanced error handling try: logger.info(f"Starting Prophet model fit for {inventory_product_id}") # ✅ FIX: Run blocking model.fit() in thread pool to avoid blocking event loop import asyncio await asyncio.to_thread(model.fit, prophet_data) logger.info(f"Prophet model fit completed successfully for {inventory_product_id}") except Exception as fit_error: error_details = { 'error_type': type(fit_error).__name__, 'error_message': str(fit_error), 'errno': getattr(fit_error, 'errno', None), 'tmpdir': tmpdir, 'disk_space': check_disk_space(tmpdir, 0) } logger.error(f"Prophet model fit failed for {inventory_product_id}: {error_details}") raise RuntimeError(f"Prophet training failed: {error_details['error_message']}") from fit_error # Calculate enhanced training metrics first training_metrics = await self._calculate_training_metrics(model, prophet_data, best_params) # Store model and metrics - Generate proper UUID for model_id model_id = str(uuid.uuid4()) # ✅ FIX: Pass session to _store_model to avoid nested session model_path = await self._store_model( tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics, db_session ) # Return same format as before, but with optimization info # Ensure hyperparameters are JSON-serializable def _serialize_hyperparameters(params): """Helper to ensure hyperparameters are JSON serializable""" if not params: return {} safe_params = {} for k, v in params.items(): try: if isinstance(v, (int, float, str, bool, type(None))): safe_params[k] = v elif hasattr(v, 'item'): # numpy scalars safe_params[k] = v.item() elif isinstance(v, (list, tuple)): safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v] else: safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v) except: safe_params[k] = str(v) # fallback to string conversion return safe_params model_info = { "model_id": model_id, "model_path": model_path, "type": "prophet_optimized", "training_samples": len(prophet_data), "features": regressor_columns, "hyperparameters": _serialize_hyperparameters(best_params), "training_metrics": training_metrics, "product_category": product_category.value if product_category else "unknown", "trained_at": datetime.now().isoformat(), "data_period": { "start_date": pd.Timestamp(prophet_data['ds'].min()).isoformat(), "end_date": pd.Timestamp(prophet_data['ds'].max()).isoformat(), "total_days": len(prophet_data) } } logger.info(f"Optimized model trained successfully for {inventory_product_id}. " f"MAPE: {training_metrics.get('optimized_mape', 'N/A')}%") return model_info try: # ✅ FIX: Use parent session if provided, otherwise create new one # This prevents nested session issues and database deadlocks if use_parent_session: logger.debug(f"Using parent session for training {inventory_product_id}") return await _train_with_lock(session) else: logger.debug(f"Creating new session for training {inventory_product_id}") async with self.database_manager.get_session() as new_session: return await _train_with_lock(new_session) except LockAcquisitionError as e: logger.warning(f"Could not acquire lock for {inventory_product_id}: {e}") raise RuntimeError(f"Training already in progress for product {inventory_product_id}") except Exception as e: logger.error(f"Failed to train optimized bakery model for {inventory_product_id}: {str(e)}") raise async def _optimize_hyperparameters(self, df: pd.DataFrame, inventory_product_id: str, regressor_columns: List[str]) -> Dict[str, Any]: """ Automatically optimize Prophet hyperparameters using Bayesian optimization. Simplified - no configuration needed. """ # Determine product category automatically product_category = self._classify_product(inventory_product_id, df) # Set optimization parameters based on category n_trials = { 'high_volume': const.OPTUNA_TRIALS_HIGH_VOLUME, 'medium_volume': const.OPTUNA_TRIALS_MEDIUM_VOLUME, 'low_volume': const.OPTUNA_TRIALS_LOW_VOLUME, 'intermittent': const.OPTUNA_TRIALS_INTERMITTENT }.get(product_category, const.OPTUNA_TRIALS_MEDIUM_VOLUME) logger.info(f"Product {inventory_product_id} classified as {product_category}, using {n_trials} trials") # Check data quality and adjust strategy total_sales = df['y'].sum() zero_ratio = (df['y'] == 0).sum() / len(df) mean_sales = df['y'].mean() non_zero_days = len(df[df['y'] > 0]) logger.info(f"Data analysis for {inventory_product_id}: total_sales={total_sales:.1f}, " f"zero_ratio={zero_ratio:.2f}, mean_sales={mean_sales:.2f}, non_zero_days={non_zero_days}") # Adjust strategy based on data characteristics if zero_ratio > const.MAX_ZERO_RATIO_INTERMITTENT or non_zero_days < const.MIN_NON_ZERO_DAYS: logger.warning(f"Very sparse data for {inventory_product_id}, using minimal optimization") return { 'changepoint_prior_scale': 0.001, 'seasonality_prior_scale': 0.01, 'holidays_prior_scale': 0.01, 'changepoint_range': 0.8, 'seasonality_mode': 'additive', 'daily_seasonality': False, 'weekly_seasonality': True, 'yearly_seasonality': False, 'uncertainty_samples': const.UNCERTAINTY_SAMPLES_SPARSE_MIN } elif zero_ratio > const.MODERATE_SPARSITY_THRESHOLD: logger.info(f"Moderate sparsity for {inventory_product_id}, using conservative optimization") return { 'changepoint_prior_scale': 0.01, 'seasonality_prior_scale': 0.1, 'holidays_prior_scale': 0.1, 'changepoint_range': 0.8, 'seasonality_mode': 'additive', 'daily_seasonality': False, 'weekly_seasonality': True, 'yearly_seasonality': len(df) > const.DATA_QUALITY_DAY_THRESHOLD_HIGH, 'uncertainty_samples': const.UNCERTAINTY_SAMPLES_SPARSE_MAX } # Use unique seed for each product to avoid identical results product_seed = hash(str(inventory_product_id)) % 10000 def objective(trial): try: # Sample hyperparameters with product-specific ranges if product_category == 'high_volume': # More conservative for high volume (less overfitting) changepoint_scale_range = (0.001, 0.1) seasonality_scale_range = (1.0, 10.0) elif product_category == 'intermittent': # Very conservative for intermittent changepoint_scale_range = (0.001, 0.05) seasonality_scale_range = (0.01, 1.0) else: # Default ranges changepoint_scale_range = (0.001, 0.5) seasonality_scale_range = (0.01, 10.0) # Determine appropriate uncertainty samples range based on product category if product_category == 'high_volume': uncertainty_range = (const.UNCERTAINTY_SAMPLES_HIGH_MIN, const.UNCERTAINTY_SAMPLES_HIGH_MAX) elif product_category == 'medium_volume': uncertainty_range = (const.UNCERTAINTY_SAMPLES_MEDIUM_MIN, const.UNCERTAINTY_SAMPLES_MEDIUM_MAX) elif product_category == 'low_volume': uncertainty_range = (const.UNCERTAINTY_SAMPLES_LOW_MIN, const.UNCERTAINTY_SAMPLES_LOW_MAX) else: # intermittent uncertainty_range = (const.UNCERTAINTY_SAMPLES_SPARSE_MIN, const.UNCERTAINTY_SAMPLES_SPARSE_MAX) params = { 'changepoint_prior_scale': trial.suggest_float( 'changepoint_prior_scale', changepoint_scale_range[0], changepoint_scale_range[1], log=True ), 'seasonality_prior_scale': trial.suggest_float( 'seasonality_prior_scale', seasonality_scale_range[0], seasonality_scale_range[1], log=True ), 'holidays_prior_scale': trial.suggest_float('holidays_prior_scale', 0.01, 10.0, log=True), 'changepoint_range': trial.suggest_float('changepoint_range', 0.8, 0.95), 'seasonality_mode': 'additive' if product_category == 'high_volume' else trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), 'daily_seasonality': trial.suggest_categorical('daily_seasonality', [True, False]), 'weekly_seasonality': True, # Always keep weekly 'yearly_seasonality': trial.suggest_categorical('yearly_seasonality', [True, False]), 'uncertainty_samples': int(trial.suggest_int('uncertainty_samples', int(uncertainty_range[0]), int(uncertainty_range[1]))) # ✅ FIX: Explicit int casting for all values } # Simple 2-fold cross-validation for speed tscv = TimeSeriesSplit(n_splits=2) cv_scores = [] for train_idx, val_idx in tscv.split(df): train_data = df.iloc[train_idx].copy() val_data = df.iloc[val_idx].copy() if len(val_data) < 7: # Need at least a week continue try: # Create and train model with adaptive uncertainty sampling uncertainty_samples = int(params.get('uncertainty_samples', 200)) # ✅ FIX: Explicit int casting to prevent type errors # Set environment variable for cmdstan tmp directory import os tmpdir = os.environ.get('TMPDIR', '/tmp/cmdstan') os.makedirs(tmpdir, mode=0o777, exist_ok=True) os.environ['TMPDIR'] = tmpdir model = Prophet(**{k: v for k, v in params.items() if k != 'uncertainty_samples'}, interval_width=0.8, uncertainty_samples=uncertainty_samples) for regressor in regressor_columns: if regressor in train_data.columns: model.add_regressor(regressor) with warnings.catch_warnings(): warnings.simplefilter("ignore") try: model.fit(train_data) except OSError as e: # Log errno for "Operation not permitted" errors if e.errno == errno.EPERM: logger.error(f"Permission denied during Prophet fit (errno={e.errno}): {e}") logger.error(f"TMPDIR: {tmpdir}, exists: {os.path.exists(tmpdir)}, " f"writable: {os.access(tmpdir, os.W_OK)}") raise # Predict on validation set future_df = model.make_future_dataframe(periods=0) for regressor in regressor_columns: if regressor in df.columns: future_df[regressor] = df[regressor].values[:len(future_df)] forecast = model.predict(future_df) val_predictions = forecast['yhat'].iloc[train_idx[-1]+1:train_idx[-1]+1+len(val_data)] val_actual = val_data['y'].values # Calculate MAPE with improved handling for low values if len(val_predictions) > 0 and len(val_actual) > 0: # Use MAE for very low sales values to avoid MAPE issues if val_actual.mean() < 1: mae = np.mean(np.abs(val_actual - val_predictions.values)) # Convert MAE to percentage-like metric mape_like = (mae / max(val_actual.mean(), 0.1)) * 100 else: non_zero_mask = val_actual > 0.1 # Use threshold instead of zero if np.sum(non_zero_mask) > 0: mape = np.mean(np.abs((val_actual[non_zero_mask] - val_predictions.values[non_zero_mask]) / val_actual[non_zero_mask])) * 100 mape_like = min(mape, 200) # Cap at 200% else: mape_like = 100 if not np.isnan(mape_like) and not np.isinf(mape_like): cv_scores.append(mape_like) except Exception as fold_error: logger.debug(f"Fold failed for {inventory_product_id} trial {trial.number}: {str(fold_error)}") continue return np.mean(cv_scores) if len(cv_scores) > 0 else 100.0 except Exception as trial_error: logger.debug(f"Trial {trial.number} failed for {inventory_product_id}: {str(trial_error)}") return 100.0 # Run optimization with product-specific seed study = optuna.create_study( direction='minimize', sampler=optuna.samplers.TPESampler(seed=product_seed) ) # ✅ FIX: Run blocking study.optimize() in thread pool to avoid blocking event loop import asyncio await asyncio.to_thread( study.optimize, objective, n_trials=n_trials, timeout=const.OPTUNA_TIMEOUT_SECONDS, show_progress_bar=False ) # Return best parameters best_params = study.best_params best_score = study.best_value logger.info(f"Optimization completed for {inventory_product_id}. Best score: {best_score:.2f}%. " f"Parameters: {best_params}") # ✅ FIX: Log uncertainty sampling configuration for debugging confidence intervals with explicit int casting uncertainty_samples = int(best_params.get('uncertainty_samples', 500)) logger.info(f"Prophet model will use {uncertainty_samples} uncertainty samples for {inventory_product_id} " f"(category: {product_category}, zero_ratio: {zero_ratio:.2f})") return best_params def _classify_product(self, inventory_product_id: str, sales_data: pd.DataFrame) -> str: """Automatically classify product for optimization strategy - improved for bakery data""" product_lower = str(inventory_product_id).lower() # Calculate sales statistics total_sales = sales_data['y'].sum() mean_sales = sales_data['y'].mean() zero_ratio = (sales_data['y'] == 0).sum() / len(sales_data) non_zero_days = len(sales_data[sales_data['y'] > 0]) logger.info(f"Product classification for {inventory_product_id}: total_sales={total_sales:.1f}, " f"mean_sales={mean_sales:.2f}, zero_ratio={zero_ratio:.2f}, non_zero_days={non_zero_days}") # Improved classification logic for bakery products # Consider both volume and consistency # Check for truly intermittent demand (high zero ratio) if zero_ratio > 0.8 or non_zero_days < 30: return 'intermittent' # High volume products (consistent daily sales) if any(pattern in product_lower for pattern in ['cafe', 'pan', 'bread', 'coffee']): # Even if absolute volume is low, these are core products return 'high_volume' if zero_ratio < 0.3 else 'medium_volume' # Volume-based classification for other products if mean_sales >= 10 and zero_ratio < 0.4: return 'high_volume' elif mean_sales >= 5 and zero_ratio < 0.6: return 'medium_volume' elif mean_sales >= 2 and zero_ratio < 0.7: return 'low_volume' else: return 'intermittent' def _create_optimized_prophet_model(self, optimized_params: Dict[str, Any], regressor_columns: List[str]) -> Prophet: """Create Prophet model with optimized parameters and adaptive uncertainty sampling""" holidays = self._get_spanish_holidays() # Determine uncertainty samples based on data characteristics with explicit int casting uncertainty_samples = int(optimized_params.get('uncertainty_samples', 500)) if optimized_params.get('uncertainty_samples') is not None else 500 # If uncertainty_samples is 0, we're in fallback mode (no cmdstan) if uncertainty_samples == 0: logger.info("Creating Prophet model without uncertainty sampling (fallback mode)") model = Prophet( holidays=holidays if not holidays.empty else None, daily_seasonality=optimized_params.get('daily_seasonality', True), weekly_seasonality=optimized_params.get('weekly_seasonality', True), yearly_seasonality=optimized_params.get('yearly_seasonality', True), seasonality_mode=optimized_params.get('seasonality_mode', 'additive'), changepoint_prior_scale=float(optimized_params.get('changepoint_prior_scale', 0.05)), seasonality_prior_scale=float(optimized_params.get('seasonality_prior_scale', 10.0)), holidays_prior_scale=float(optimized_params.get('holidays_prior_scale', 10.0)), changepoint_range=float(optimized_params.get('changepoint_range', 0.8)), interval_width=0.8, mcmc_samples=0, uncertainty_samples=1 # Minimum value to avoid errors ) else: model = Prophet( holidays=holidays if not holidays.empty else None, daily_seasonality=optimized_params.get('daily_seasonality', True), weekly_seasonality=optimized_params.get('weekly_seasonality', True), yearly_seasonality=optimized_params.get('yearly_seasonality', True), seasonality_mode=optimized_params.get('seasonality_mode', 'additive'), changepoint_prior_scale=float(optimized_params.get('changepoint_prior_scale', 0.05)), seasonality_prior_scale=float(optimized_params.get('seasonality_prior_scale', 10.0)), holidays_prior_scale=float(optimized_params.get('holidays_prior_scale', 10.0)), changepoint_range=float(optimized_params.get('changepoint_range', 0.8)), interval_width=0.8, mcmc_samples=0, uncertainty_samples=uncertainty_samples ) return model # All the existing methods remain the same, just with enhanced metrics async def _calculate_training_metrics(self, model: Prophet, training_data: pd.DataFrame, optimized_params: Dict[str, Any] = None) -> Dict[str, float]: """Calculate training metrics with optimization info and improved MAPE handling""" try: # Generate in-sample predictions forecast = model.predict(training_data[['ds'] + [col for col in training_data.columns if col not in ['ds', 'y']]]) # Calculate metrics y_true = training_data['y'].values y_pred = forecast['yhat'].values # Basic metrics mae = mean_absolute_error(y_true, y_pred) mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) # Improved MAPE calculation for bakery data mean_actual = y_true.mean() median_actual = np.median(y_true[y_true > 0]) if np.any(y_true > 0) else 1.0 # Use different strategies based on sales volume if mean_actual < 2.0: # For very low volume products, use normalized MAE normalized_mae = mae / max(median_actual, 1.0) mape = min(normalized_mae * 100, 200) # Cap at 200% logger.info(f"Using normalized MAE for low-volume product (mean={mean_actual:.2f})") elif mean_actual < 5.0: # For low-medium volume, use modified MAPE with higher threshold threshold = 1.0 valid_mask = y_true >= threshold if np.sum(valid_mask) == 0: mape = 150.0 # High but not extreme else: mape_values = np.abs((y_true[valid_mask] - y_pred[valid_mask]) / y_true[valid_mask]) mape = np.median(mape_values) * 100 # Use median instead of mean to reduce outlier impact mape = min(mape, 150) # Cap at reasonable level else: # Standard MAPE for higher volume products threshold = 0.5 valid_mask = y_true > threshold if np.sum(valid_mask) == 0: mape = 100.0 else: mape_values = np.abs((y_true[valid_mask] - y_pred[valid_mask]) / y_true[valid_mask]) mape = np.mean(mape_values) * 100 # Cap MAPE at reasonable maximum if math.isinf(mape) or math.isnan(mape) or mape > 200: mape = min(200.0, (mae / max(mean_actual, 1.0)) * 100) # R-squared ss_res = np.sum((y_true - y_pred) ** 2) ss_tot = np.sum((y_true - np.mean(y_true)) ** 2) r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0 # Calculate realistic improvement estimate based on actual product performance # Use more granular categories and realistic baselines total_sales = training_data['y'].sum() zero_ratio = (training_data['y'] == 0).sum() / len(training_data) mean_sales = training_data['y'].mean() non_zero_days = len(training_data[training_data['y'] > 0]) # More nuanced categorization if zero_ratio > 0.8 or non_zero_days < 30: category = 'very_sparse' baseline_mape = 80.0 elif zero_ratio > 0.6: category = 'sparse' baseline_mape = 60.0 elif mean_sales >= 10 and zero_ratio < 0.3: category = 'high_volume' baseline_mape = 25.0 elif mean_sales >= 5 and zero_ratio < 0.5: category = 'medium_volume' baseline_mape = 35.0 else: category = 'low_volume' baseline_mape = 45.0 # Calculate improvement - be more conservative if mape < baseline_mape * 0.8: # Only claim improvement if significant improvement_pct = (baseline_mape - mape) / baseline_mape * 100 else: improvement_pct = 0 # No meaningful improvement # Quality score based on data characteristics quality_score = max(0.1, min(1.0, (1 - zero_ratio) * (non_zero_days / len(training_data)))) # Enhanced metrics with optimization info metrics = { "mae": round(mae, 2), "mse": round(mse, 2), "rmse": round(rmse, 2), "mape": round(mape, 2), "r2": round(r2, 3), "optimized": True, "optimized_mape": round(mape, 2), "baseline_mape_estimate": round(baseline_mape, 2), "improvement_estimated": round(improvement_pct, 1), "product_category": category, "data_quality_score": round(quality_score, 2), "mean_sales_volume": round(mean_sales, 2), "sales_consistency": round(non_zero_days / len(training_data), 2), "total_demand": round(total_sales, 1) } logger.info(f"Training metrics calculated: MAPE={mape:.1f}%, " f"Category={category}, Improvement={improvement_pct:.1f}%") return metrics except Exception as e: logger.error(f"Error calculating training metrics: {str(e)}") return { "mae": 0.0, "mse": 0.0, "rmse": 0.0, "mape": 100.0, "r2": 0.0, "optimized": False, "improvement_estimated": 0.0 } async def _store_model(self, tenant_id: str, inventory_product_id: str, model: Prophet, model_id: str, training_data: pd.DataFrame, regressor_columns: List[str], optimized_params: Dict[str, Any] = None, training_metrics: Dict[str, Any] = None, session = None) -> str: """Store model with database integration""" # Create model directory model_dir = Path(settings.MODEL_STORAGE_PATH) / tenant_id model_dir.mkdir(parents=True, exist_ok=True) # Store model file model_path = model_dir / f"{model_id}.pkl" joblib.dump(model, model_path) # Calculate checksum for model file integrity checksummed_file = ChecksummedFile(str(model_path)) model_checksum = checksummed_file.calculate_and_save_checksum() # Enhanced metadata with checksum metadata = { "model_id": model_id, "tenant_id": tenant_id, "inventory_product_id": inventory_product_id, "regressor_columns": regressor_columns, "training_samples": len(training_data), "data_period": { "start_date": pd.Timestamp(training_data['ds'].min()).isoformat(), "end_date": pd.Timestamp(training_data['ds'].max()).isoformat() }, "optimized": True, "optimized_parameters": optimized_params or {}, "created_at": datetime.now().isoformat(), "model_type": "prophet_optimized", "file_path": str(model_path), "checksum": model_checksum, "checksum_algorithm": "sha256" } metadata_path = model_path.with_suffix('.json') with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2, default=str) # Store in memory model_key = f"{tenant_id}:{inventory_product_id}" self.models[model_key] = model self.model_metadata[model_key] = metadata # 🆕 NEW: Store in database using session (parent or new) use_parent_session = session is not None async def _store_in_db(db_session): """Inner function to store model in database""" # Deactivate previous models for this product await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id) # Helper to ensure hyperparameters are JSON serializable def _serialize_hyperparameters(params): if not params: return {} safe_params = {} for k, v in params.items(): try: if isinstance(v, (int, float, str, bool, type(None))): safe_params[k] = v elif hasattr(v, 'item'): # numpy scalars safe_params[k] = v.item() elif isinstance(v, (list, tuple)): safe_params[k] = [x.item() if hasattr(x, 'item') else x for x in v] else: safe_params[k] = float(v) if isinstance(v, (np.integer, np.floating)) else str(v) except: safe_params[k] = str(v) # fallback to string conversion return safe_params # Create new database record db_model = TrainedModel( id=model_id, tenant_id=tenant_id, inventory_product_id=inventory_product_id, model_type="prophet_optimized", job_id=model_id.split('_')[0], # Extract job_id from model_id model_path=str(model_path), metadata_path=str(metadata_path), hyperparameters=_serialize_hyperparameters(optimized_params or {}), features_used=[str(f) for f in regressor_columns] if regressor_columns else [], is_active=True, is_production=True, # New models are production-ready training_start_date=pd.Timestamp(training_data['ds'].min()).to_pydatetime().replace(tzinfo=None), training_end_date=pd.Timestamp(training_data['ds'].max()).to_pydatetime().replace(tzinfo=None), training_samples=len(training_data) ) # Add training metrics if available if training_metrics: db_model.mape = float(training_metrics.get('mape')) if training_metrics.get('mape') is not None else None db_model.mae = float(training_metrics.get('mae')) if training_metrics.get('mae') is not None else None db_model.rmse = float(training_metrics.get('rmse')) if training_metrics.get('rmse') is not None else None db_model.r2_score = float(training_metrics.get('r2')) if training_metrics.get('r2') is not None else None db_model.data_quality_score = float(training_metrics.get('data_quality_score')) if training_metrics.get('data_quality_score') is not None else None db_session.add(db_model) # ✅ FIX: Don't commit here - let the outer scope handle commits # This prevents double commit when using a new dedicated session # Parent sessions will commit when parent completes; new sessions commit in outer scope logger.debug(f"Added model {model_id} to session (commit handled by caller)") try: # ✅ FIX: Use parent session if provided, otherwise create new one # The parent session handles commits, so child sessions shouldn't commit if use_parent_session: logger.debug(f"Using parent session for storing model {model_id}") await _store_in_db(session) else: logger.debug(f"Creating new session for storing model {model_id}") async with self.database_manager.get_session() as new_session: await _store_in_db(new_session) # Commit only when using our own session (not parent) await new_session.commit() # This is safe since new_session is dedicated logger.info(f"Model {model_id} stored in database successfully") except Exception as e: logger.error(f"Failed to store model in database: {str(e)}") # Continue execution - file storage succeeded logger.info(f"Optimized model stored at: {model_path}") return str(model_path) async def _deactivate_previous_models_with_session(self, db_session, tenant_id: str, inventory_product_id: str): """Deactivate previous models for the same product using provided session""" try: # ✅ FIX: Wrap SQL string with text() for SQLAlchemy 2.0 query = text(""" UPDATE trained_models SET is_active = false, is_production = false WHERE tenant_id = :tenant_id AND inventory_product_id = :inventory_product_id """) await db_session.execute(query, { "tenant_id": tenant_id, "inventory_product_id": inventory_product_id }) # Note: Don't commit here, let the calling method handle the transaction logger.info(f"Successfully deactivated previous models for {inventory_product_id}") except Exception as e: logger.error(f"Failed to deactivate previous models: {str(e)}") raise async def generate_forecast(self, model_path: str, future_dates: pd.DataFrame, regressor_columns: List[str]) -> pd.DataFrame: """Generate forecast using stored model with checksum verification""" try: # Verify model file integrity before loading checksummed_file = ChecksummedFile(model_path) if not checksummed_file.load_and_verify_checksum(): logger.warning(f"Checksum verification failed for model: {model_path}") # Still load the model but log warning # In production, you might want to raise an exception instead model = joblib.load(model_path) for regressor in regressor_columns: if regressor not in future_dates.columns: logger.warning(f"Missing regressor {regressor}, filling with median") future_dates[regressor] = 0 forecast = model.predict(future_dates) return forecast except Exception as e: logger.error(f"Failed to generate forecast: {str(e)}") raise async def _validate_training_data(self, df: pd.DataFrame, inventory_product_id: str): """Validate training data quality (unchanged)""" if df.empty: raise ValueError(f"No training data available for {inventory_product_id}") if len(df) < settings.MIN_TRAINING_DATA_DAYS: raise ValueError( f"Insufficient training data for {inventory_product_id}: " f"{len(df)} days, minimum required: {settings.MIN_TRAINING_DATA_DAYS}" ) required_columns = ['ds', 'y'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") if df['ds'].isna().any(): raise ValueError("Invalid dates found in training data") if df['y'].isna().all(): raise ValueError("No valid target values found") async def _prepare_prophet_data(self, df: pd.DataFrame) -> pd.DataFrame: """Prepare data for Prophet training with timezone handling""" prophet_data = df.copy() if 'ds' not in prophet_data.columns: raise ValueError("Missing 'ds' column in training data") if 'y' not in prophet_data.columns: raise ValueError("Missing 'y' column in training data") # Use timezone utility to prepare Prophet-compatible datetime prophet_data = prepare_prophet_datetime(prophet_data, 'ds') # Sort by date and clean data prophet_data = prophet_data.sort_values('ds').reset_index(drop=True) prophet_data['y'] = pd.to_numeric(prophet_data['y'], errors='coerce') prophet_data = prophet_data.dropna(subset=['y']) # Remove any duplicate dates (keep last occurrence) prophet_data = prophet_data.drop_duplicates(subset=['ds'], keep='last') # Ensure y values are non-negative prophet_data['y'] = prophet_data['y'].clip(lower=0) logger.info(f"Prepared Prophet data: {len(prophet_data)} rows, date range: {pd.Timestamp(prophet_data['ds'].min())} to {pd.Timestamp(prophet_data['ds'].max())}") return prophet_data def _extract_regressor_columns(self, df: pd.DataFrame) -> List[str]: """Extract regressor columns (unchanged)""" excluded_columns = ['ds', 'y'] regressor_columns = [] for col in df.columns: if col not in excluded_columns and df[col].dtype in ['int64', 'float64']: regressor_columns.append(col) logger.info(f"Identified regressor columns: {regressor_columns}") return regressor_columns def _get_spanish_holidays(self, region: str = None) -> pd.DataFrame: """ Get Spanish holidays dynamically using holidays library. Supports national and regional holidays, including dynamic Easter calculation. Args: region: Region code (e.g., 'MD' for Madrid, 'PV' for Basque Country) Returns: DataFrame with holiday dates and names """ try: import holidays holidays_list = [] years = range(2020, 2035) # Extended range for better coverage # Get Spanish holidays for each year for year in years: # National holidays spain_holidays = holidays.Spain(years=year, prov=region) for date, name in spain_holidays.items(): holidays_list.append({ 'holiday': self._normalize_holiday_name(name), 'ds': pd.Timestamp(date), 'lower_window': 0, 'upper_window': 0 # Can be adjusted for multi-day holidays }) if holidays_list: holidays_df = pd.DataFrame(holidays_list) # Remove duplicates (some holidays may repeat) holidays_df = holidays_df.drop_duplicates(subset=['ds', 'holiday']) holidays_df = holidays_df.sort_values('ds').reset_index(drop=True) # ✅ FIX: Don't pass keyword args to logger - use f-string instead logger.info(f"Loaded {len(holidays_df)} Spanish holidays dynamically (region={region or 'National'}, years={min(years)}-{max(years)})") return holidays_df else: return pd.DataFrame() except Exception as e: logger.warning(f"Could not load Spanish holidays dynamically: {str(e)}") # Fallback to minimal hardcoded holidays return self._get_fallback_holidays() def _normalize_holiday_name(self, name: str) -> str: """Normalize holiday name to a consistent format for Prophet""" # Convert to lowercase and replace spaces with underscores normalized = name.lower().replace(' ', '_').replace("'", '') # Remove special characters normalized = ''.join(c for c in normalized if c.isalnum() or c == '_') return normalized def _get_fallback_holidays(self) -> pd.DataFrame: """Fallback to basic hardcoded holidays if dynamic loading fails""" try: holidays_list = [] years = range(2020, 2035) for year in years: holidays_list.extend([ {'holiday': 'new_year', 'ds': f'{year}-01-01'}, {'holiday': 'epiphany', 'ds': f'{year}-01-06'}, {'holiday': 'labor_day', 'ds': f'{year}-05-01'}, {'holiday': 'assumption', 'ds': f'{year}-08-15'}, {'holiday': 'national_day', 'ds': f'{year}-10-12'}, {'holiday': 'all_saints', 'ds': f'{year}-11-01'}, {'holiday': 'constitution_day', 'ds': f'{year}-12-06'}, {'holiday': 'immaculate_conception', 'ds': f'{year}-12-08'}, {'holiday': 'christmas', 'ds': f'{year}-12-25'} ]) holidays_df = pd.DataFrame(holidays_list) holidays_df['ds'] = pd.to_datetime(holidays_df['ds']) return holidays_df except Exception as e: logger.error(f"Fallback holidays failed: {e}") return pd.DataFrame()