2025-07-19 16:59:37 +02:00
# services/training/app/ml/prophet_manager.py
"""
2025-07-28 19:28:39 +02:00
Simplified Prophet Manager with Built - in Hyperparameter Optimization
Direct replacement for existing BakeryProphetManager - optimization always enabled .
2025-07-19 16:59:37 +02:00
"""
from typing import Dict , List , Any , Optional , Tuple
import pandas as pd
import numpy as np
from prophet import Prophet
import logging
from datetime import datetime , timedelta
import uuid
import os
import joblib
from sklearn . metrics import mean_absolute_error , mean_squared_error , r2_score
2025-07-28 19:28:39 +02:00
from sklearn . model_selection import TimeSeriesSplit
2025-07-19 16:59:37 +02:00
import json
from pathlib import Path
2025-07-27 16:29:53 +02:00
import math
2025-07-28 19:28:39 +02:00
import warnings
2025-11-05 13:34:56 +01:00
import shutil
import errno
2025-07-28 19:28:39 +02:00
warnings . filterwarnings ( ' ignore ' )
from sqlalchemy . ext . asyncio import AsyncSession
2025-07-29 12:45:39 +02:00
from sqlalchemy import text
2025-07-28 19:28:39 +02:00
from app . models . training import TrainedModel
2025-08-08 09:08:41 +02:00
from shared . database . base import create_database_manager
from app . repositories import ModelRepository
2025-07-28 19:28:39 +02:00
# Simple optimization import
import optuna
optuna . logging . set_verbosity ( optuna . logging . WARNING )
2025-07-19 16:59:37 +02:00
from app . core . config import settings
2025-10-09 14:11:02 +02:00
from app . core import constants as const
2025-10-12 23:16:04 +02:00
from app . utils . ml_datetime import prepare_prophet_datetime
2025-10-09 14:11:02 +02:00
from app . utils . file_utils import ChecksummedFile , calculate_file_checksum
from app . utils . distributed_lock import get_training_lock , LockAcquisitionError
2025-07-19 16:59:37 +02:00
logger = logging . getLogger ( __name__ )
2025-11-05 13:34:56 +01:00
def check_disk_space ( path = ' /tmp ' , min_free_gb = 1.0 ) :
"""
Check if there ' s enough disk space available.
Args :
path : Path to check disk space for
min_free_gb : Minimum required free space in GB
Returns :
tuple : ( bool : has_space , float : free_gb , float : total_gb , float : used_percent )
"""
try :
stat = shutil . disk_usage ( path )
total_gb = stat . total / ( 1024 * * 3 )
free_gb = stat . free / ( 1024 * * 3 )
used_gb = stat . used / ( 1024 * * 3 )
used_percent = ( stat . used / stat . total ) * 100
has_space = free_gb > = min_free_gb
logger . info ( f " Disk space check for { path } : "
f " total= { total_gb : .2f } GB, free= { free_gb : .2f } GB, "
f " used= { used_gb : .2f } GB ( { used_percent : .1f } %) " )
if used_percent > 85 :
logger . warning ( f " Disk usage is high: { used_percent : .1f } % - this may cause issues " )
return has_space , free_gb , total_gb , used_percent
except Exception as e :
logger . error ( f " Failed to check disk space: { e } " )
return True , 0 , 0 , 0 # Assume OK if we can't check
2025-07-19 16:59:37 +02:00
class BakeryProphetManager :
"""
2025-07-28 19:28:39 +02:00
Simplified Prophet Manager with built - in hyperparameter optimization .
Drop - in replacement for the existing manager - optimization runs automatically .
2025-07-19 16:59:37 +02:00
"""
2025-08-08 09:08:41 +02:00
def __init__ ( self , database_manager = None ) :
2025-07-19 16:59:37 +02:00
self . models = { } # In-memory model storage
self . model_metadata = { } # Store model metadata
2025-08-08 09:08:41 +02:00
self . database_manager = database_manager or create_database_manager ( settings . DATABASE_URL , " training-service " )
self . db_session = None # Will be set when session is available
2025-07-28 19:28:39 +02:00
2025-07-19 16:59:37 +02:00
# Ensure model storage directory exists
os . makedirs ( settings . MODEL_STORAGE_PATH , exist_ok = True )
2025-10-09 14:11:02 +02:00
async def train_bakery_model ( self ,
tenant_id : str ,
inventory_product_id : str ,
2025-07-19 16:59:37 +02:00
df : pd . DataFrame ,
2025-11-05 13:34:56 +01:00
job_id : str ,
product_category : ' ProductCategory ' = None ,
2025-11-05 16:13:32 +01:00
category_hyperparameters : Dict [ str , Any ] = None ,
session = None ) - > Dict [ str , Any ] :
2025-07-19 16:59:37 +02:00
"""
2025-10-09 14:11:02 +02:00
Train a Prophet model with automatic hyperparameter optimization and distributed locking .
2025-11-05 13:34:56 +01:00
Args :
tenant_id : Tenant identifier
inventory_product_id : Product identifier
df : Training data DataFrame
job_id : Training job identifier
product_category : Optional product category for category - specific settings
category_hyperparameters : Optional category - specific Prophet hyperparameters
2025-11-05 16:13:32 +01:00
session : Optional database session ( uses parent session if provided to avoid nested sessions )
2025-07-19 16:59:37 +02:00
"""
2025-11-05 13:34:56 +01:00
# Check disk space before starting training
has_space , free_gb , total_gb , used_percent = check_disk_space ( ' /tmp ' , min_free_gb = 0.5 )
if not has_space :
error_msg = f " Insufficient disk space: { free_gb : .2f } GB free ( { used_percent : .1f } % used). Need at least 0.5GB free. "
logger . error ( error_msg )
raise RuntimeError ( error_msg )
2025-10-09 14:11:02 +02:00
# Acquire distributed lock to prevent concurrent training of same product
lock = get_training_lock ( tenant_id , inventory_product_id , use_advisory = True )
2025-11-05 16:13:32 +01:00
# Use provided session or create new one if not provided
use_parent_session = session is not None
async def _train_with_lock ( db_session ) :
""" Inner function to perform training with lock """
async with lock . acquire ( db_session ) :
logger . info ( f " Training optimized bakery model for { inventory_product_id } (lock acquired) " )
# Validate input data
await self . _validate_training_data ( df , inventory_product_id )
# Prepare data for Prophet
prophet_data = await self . _prepare_prophet_data ( df )
# Get regressor columns
regressor_columns = self . _extract_regressor_columns ( prophet_data )
# Use category-specific hyperparameters if provided, otherwise optimize
if category_hyperparameters :
logger . info ( f " Using category-specific hyperparameters for { inventory_product_id } (category: { product_category . value if product_category else ' unknown ' } ) " )
best_params = category_hyperparameters . copy ( )
use_optimized = False # Not optimized, but category-specific
else :
# Automatically optimize hyperparameters
logger . info ( f " Optimizing hyperparameters for { inventory_product_id } ... " )
2025-11-05 13:34:56 +01:00
try :
2025-11-05 16:13:32 +01:00
best_params = await self . _optimize_hyperparameters ( prophet_data , inventory_product_id , regressor_columns )
use_optimized = True
except Exception as opt_error :
logger . warning ( f " Hyperparameter optimization failed for { inventory_product_id } : { opt_error } " )
logger . warning ( " Falling back to default Prophet parameters " )
# Use conservative default parameters
best_params = {
' changepoint_prior_scale ' : 0.05 ,
' seasonality_prior_scale ' : 10.0 ,
' holidays_prior_scale ' : 10.0 ,
' changepoint_range ' : 0.8 ,
' seasonality_mode ' : ' additive ' ,
' daily_seasonality ' : False ,
' weekly_seasonality ' : True ,
' yearly_seasonality ' : len ( prophet_data ) > 365 ,
' uncertainty_samples ' : 0 # Disable uncertainty sampling to avoid cmdstan
2025-10-09 14:11:02 +02:00
}
2025-11-05 16:13:32 +01:00
use_optimized = False
# Create optimized Prophet model
model = self . _create_optimized_prophet_model ( best_params , regressor_columns )
# Add regressors to model
for regressor in regressor_columns :
if regressor in prophet_data . columns :
model . add_regressor ( regressor )
# Set environment variable for cmdstan tmp directory
import os
tmpdir = os . environ . get ( ' TMPDIR ' , ' /tmp/cmdstan ' )
os . makedirs ( tmpdir , mode = 0o777 , exist_ok = True )
os . environ [ ' TMPDIR ' ] = tmpdir
# Verify tmp directory is writable
test_file = os . path . join ( tmpdir , f ' test_write_ { inventory_product_id } .tmp ' )
try :
with open ( test_file , ' w ' ) as f :
f . write ( ' test ' )
os . remove ( test_file )
logger . debug ( f " Verified { tmpdir } is writable " )
except Exception as e :
logger . error ( f " TMPDIR { tmpdir } is not writable: { e } " )
raise RuntimeError ( f " Cannot write to { tmpdir } : { e } " )
# Fit the model with enhanced error handling
try :
logger . info ( f " Starting Prophet model fit for { inventory_product_id } " )
# ✅ FIX: Run blocking model.fit() in thread pool to avoid blocking event loop
import asyncio
await asyncio . to_thread ( model . fit , prophet_data )
logger . info ( f " Prophet model fit completed successfully for { inventory_product_id } " )
except Exception as fit_error :
error_details = {
' error_type ' : type ( fit_error ) . __name__ ,
' error_message ' : str ( fit_error ) ,
' errno ' : getattr ( fit_error , ' errno ' , None ) ,
' tmpdir ' : tmpdir ,
' disk_space ' : check_disk_space ( tmpdir , 0 )
}
logger . error ( f " Prophet model fit failed for { inventory_product_id } : { error_details } " )
raise RuntimeError ( f " Prophet training failed: { error_details [ ' error_message ' ] } " ) from fit_error
# Calculate enhanced training metrics first
training_metrics = await self . _calculate_training_metrics ( model , prophet_data , best_params )
# Store model and metrics - Generate proper UUID for model_id
model_id = str ( uuid . uuid4 ( ) )
2025-11-05 16:41:53 +01:00
# ✅ FIX: Pass session to _store_model to avoid nested session
2025-11-05 16:13:32 +01:00
model_path = await self . _store_model (
2025-11-05 16:41:53 +01:00
tenant_id , inventory_product_id , model , model_id , prophet_data , regressor_columns , best_params , training_metrics , db_session
2025-11-05 16:13:32 +01:00
)
# Return same format as before, but with optimization info
# Ensure hyperparameters are JSON-serializable
def _serialize_hyperparameters ( params ) :
""" Helper to ensure hyperparameters are JSON serializable """
if not params :
return { }
safe_params = { }
for k , v in params . items ( ) :
try :
if isinstance ( v , ( int , float , str , bool , type ( None ) ) ) :
safe_params [ k ] = v
elif hasattr ( v , ' item ' ) : # numpy scalars
safe_params [ k ] = v . item ( )
elif isinstance ( v , ( list , tuple ) ) :
safe_params [ k ] = [ x . item ( ) if hasattr ( x , ' item ' ) else x for x in v ]
else :
safe_params [ k ] = float ( v ) if isinstance ( v , ( np . integer , np . floating ) ) else str ( v )
except :
safe_params [ k ] = str ( v ) # fallback to string conversion
return safe_params
model_info = {
" model_id " : model_id ,
" model_path " : model_path ,
" type " : " prophet_optimized " ,
" training_samples " : len ( prophet_data ) ,
" features " : regressor_columns ,
" hyperparameters " : _serialize_hyperparameters ( best_params ) ,
" training_metrics " : training_metrics ,
" product_category " : product_category . value if product_category else " unknown " ,
" trained_at " : datetime . now ( ) . isoformat ( ) ,
" data_period " : {
" start_date " : pd . Timestamp ( prophet_data [ ' ds ' ] . min ( ) ) . isoformat ( ) ,
" end_date " : pd . Timestamp ( prophet_data [ ' ds ' ] . max ( ) ) . isoformat ( ) ,
" total_days " : len ( prophet_data )
2025-10-09 14:11:02 +02:00
}
2025-11-05 16:13:32 +01:00
}
logger . info ( f " Optimized model trained successfully for { inventory_product_id } . "
f " MAPE: { training_metrics . get ( ' optimized_mape ' , ' N/A ' ) } % " )
return model_info
2025-10-09 14:11:02 +02:00
2025-11-05 16:13:32 +01:00
try :
# ✅ FIX: Use parent session if provided, otherwise create new one
# This prevents nested session issues and database deadlocks
if use_parent_session :
logger . debug ( f " Using parent session for training { inventory_product_id } " )
return await _train_with_lock ( session )
else :
logger . debug ( f " Creating new session for training { inventory_product_id } " )
async with self . database_manager . get_session ( ) as new_session :
return await _train_with_lock ( new_session )
2025-10-09 14:11:02 +02:00
except LockAcquisitionError as e :
logger . warning ( f " Could not acquire lock for { inventory_product_id } : { e } " )
raise RuntimeError ( f " Training already in progress for product { inventory_product_id } " )
2025-07-19 16:59:37 +02:00
except Exception as e :
2025-08-14 16:47:34 +02:00
logger . error ( f " Failed to train optimized bakery model for { inventory_product_id } : { str ( e ) } " )
2025-07-19 16:59:37 +02:00
raise
2025-07-28 19:28:39 +02:00
async def _optimize_hyperparameters ( self ,
df : pd . DataFrame ,
2025-08-14 16:47:34 +02:00
inventory_product_id : str ,
2025-07-28 19:28:39 +02:00
regressor_columns : List [ str ] ) - > Dict [ str , Any ] :
"""
Automatically optimize Prophet hyperparameters using Bayesian optimization .
Simplified - no configuration needed .
"""
# Determine product category automatically
2025-08-14 16:47:34 +02:00
product_category = self . _classify_product ( inventory_product_id , df )
2025-07-28 19:28:39 +02:00
# Set optimization parameters based on category
n_trials = {
2025-10-09 14:11:02 +02:00
' high_volume ' : const . OPTUNA_TRIALS_HIGH_VOLUME ,
' medium_volume ' : const . OPTUNA_TRIALS_MEDIUM_VOLUME ,
' low_volume ' : const . OPTUNA_TRIALS_LOW_VOLUME ,
' intermittent ' : const . OPTUNA_TRIALS_INTERMITTENT
} . get ( product_category , const . OPTUNA_TRIALS_MEDIUM_VOLUME )
2025-07-28 19:28:39 +02:00
2025-08-14 16:47:34 +02:00
logger . info ( f " Product { inventory_product_id } classified as { product_category } , using { n_trials } trials " )
2025-07-28 19:28:39 +02:00
# Check data quality and adjust strategy
total_sales = df [ ' y ' ] . sum ( )
zero_ratio = ( df [ ' y ' ] == 0 ) . sum ( ) / len ( df )
mean_sales = df [ ' y ' ] . mean ( )
non_zero_days = len ( df [ df [ ' y ' ] > 0 ] )
2025-08-14 16:47:34 +02:00
logger . info ( f " Data analysis for { inventory_product_id } : total_sales= { total_sales : .1f } , "
2025-07-28 19:28:39 +02:00
f " zero_ratio= { zero_ratio : .2f } , mean_sales= { mean_sales : .2f } , non_zero_days= { non_zero_days } " )
# Adjust strategy based on data characteristics
2025-10-09 14:11:02 +02:00
if zero_ratio > const . MAX_ZERO_RATIO_INTERMITTENT or non_zero_days < const . MIN_NON_ZERO_DAYS :
2025-08-14 16:47:34 +02:00
logger . warning ( f " Very sparse data for { inventory_product_id } , using minimal optimization " )
2025-07-28 19:28:39 +02:00
return {
' changepoint_prior_scale ' : 0.001 ,
' seasonality_prior_scale ' : 0.01 ,
' holidays_prior_scale ' : 0.01 ,
' changepoint_range ' : 0.8 ,
' seasonality_mode ' : ' additive ' ,
' daily_seasonality ' : False ,
' weekly_seasonality ' : True ,
2025-08-10 18:32:47 +02:00
' yearly_seasonality ' : False ,
2025-10-09 14:11:02 +02:00
' uncertainty_samples ' : const . UNCERTAINTY_SAMPLES_SPARSE_MIN
2025-07-28 19:28:39 +02:00
}
2025-10-09 14:11:02 +02:00
elif zero_ratio > const . MODERATE_SPARSITY_THRESHOLD :
2025-08-14 16:47:34 +02:00
logger . info ( f " Moderate sparsity for { inventory_product_id } , using conservative optimization " )
2025-07-28 19:28:39 +02:00
return {
' changepoint_prior_scale ' : 0.01 ,
' seasonality_prior_scale ' : 0.1 ,
' holidays_prior_scale ' : 0.1 ,
' changepoint_range ' : 0.8 ,
' seasonality_mode ' : ' additive ' ,
' daily_seasonality ' : False ,
' weekly_seasonality ' : True ,
2025-10-09 14:11:02 +02:00
' yearly_seasonality ' : len ( df ) > const . DATA_QUALITY_DAY_THRESHOLD_HIGH ,
' uncertainty_samples ' : const . UNCERTAINTY_SAMPLES_SPARSE_MAX
2025-07-28 19:28:39 +02:00
}
# Use unique seed for each product to avoid identical results
2025-08-14 16:47:34 +02:00
product_seed = hash ( str ( inventory_product_id ) ) % 10000
2025-07-28 19:28:39 +02:00
def objective ( trial ) :
try :
# Sample hyperparameters with product-specific ranges
if product_category == ' high_volume ' :
# More conservative for high volume (less overfitting)
changepoint_scale_range = ( 0.001 , 0.1 )
seasonality_scale_range = ( 1.0 , 10.0 )
elif product_category == ' intermittent ' :
# Very conservative for intermittent
changepoint_scale_range = ( 0.001 , 0.05 )
seasonality_scale_range = ( 0.01 , 1.0 )
else :
# Default ranges
changepoint_scale_range = ( 0.001 , 0.5 )
seasonality_scale_range = ( 0.01 , 10.0 )
2025-10-09 14:11:02 +02:00
# Determine appropriate uncertainty samples range based on product category
2025-08-10 18:32:47 +02:00
if product_category == ' high_volume ' :
2025-10-09 14:11:02 +02:00
uncertainty_range = ( const . UNCERTAINTY_SAMPLES_HIGH_MIN , const . UNCERTAINTY_SAMPLES_HIGH_MAX )
2025-08-10 18:32:47 +02:00
elif product_category == ' medium_volume ' :
2025-10-09 14:11:02 +02:00
uncertainty_range = ( const . UNCERTAINTY_SAMPLES_MEDIUM_MIN , const . UNCERTAINTY_SAMPLES_MEDIUM_MAX )
2025-08-10 18:32:47 +02:00
elif product_category == ' low_volume ' :
2025-10-09 14:11:02 +02:00
uncertainty_range = ( const . UNCERTAINTY_SAMPLES_LOW_MIN , const . UNCERTAINTY_SAMPLES_LOW_MAX )
2025-08-10 18:32:47 +02:00
else : # intermittent
2025-10-09 14:11:02 +02:00
uncertainty_range = ( const . UNCERTAINTY_SAMPLES_SPARSE_MIN , const . UNCERTAINTY_SAMPLES_SPARSE_MAX )
2025-08-10 18:32:47 +02:00
2025-07-28 19:28:39 +02:00
params = {
' changepoint_prior_scale ' : trial . suggest_float (
' changepoint_prior_scale ' ,
changepoint_scale_range [ 0 ] ,
changepoint_scale_range [ 1 ] ,
log = True
) ,
' seasonality_prior_scale ' : trial . suggest_float (
' seasonality_prior_scale ' ,
seasonality_scale_range [ 0 ] ,
seasonality_scale_range [ 1 ] ,
log = True
) ,
' holidays_prior_scale ' : trial . suggest_float ( ' holidays_prior_scale ' , 0.01 , 10.0 , log = True ) ,
' changepoint_range ' : trial . suggest_float ( ' changepoint_range ' , 0.8 , 0.95 ) ,
' seasonality_mode ' : ' additive ' if product_category == ' high_volume ' else trial . suggest_categorical ( ' seasonality_mode ' , [ ' additive ' , ' multiplicative ' ] ) ,
' daily_seasonality ' : trial . suggest_categorical ( ' daily_seasonality ' , [ True , False ] ) ,
' weekly_seasonality ' : True , # Always keep weekly
2025-08-10 18:32:47 +02:00
' yearly_seasonality ' : trial . suggest_categorical ( ' yearly_seasonality ' , [ True , False ] ) ,
2025-11-05 13:34:56 +01:00
' uncertainty_samples ' : int ( trial . suggest_int ( ' uncertainty_samples ' , int ( uncertainty_range [ 0 ] ) , int ( uncertainty_range [ 1 ] ) ) ) # ✅ FIX: Explicit int casting for all values
2025-07-28 19:28:39 +02:00
}
# Simple 2-fold cross-validation for speed
tscv = TimeSeriesSplit ( n_splits = 2 )
cv_scores = [ ]
for train_idx , val_idx in tscv . split ( df ) :
train_data = df . iloc [ train_idx ] . copy ( )
val_data = df . iloc [ val_idx ] . copy ( )
if len ( val_data ) < 7 : # Need at least a week
continue
try :
2025-08-10 18:32:47 +02:00
# Create and train model with adaptive uncertainty sampling
2025-11-05 13:34:56 +01:00
uncertainty_samples = int ( params . get ( ' uncertainty_samples ' , 200 ) ) # ✅ FIX: Explicit int casting to prevent type errors
# Set environment variable for cmdstan tmp directory
import os
tmpdir = os . environ . get ( ' TMPDIR ' , ' /tmp/cmdstan ' )
os . makedirs ( tmpdir , mode = 0o777 , exist_ok = True )
os . environ [ ' TMPDIR ' ] = tmpdir
model = Prophet ( * * { k : v for k , v in params . items ( ) if k != ' uncertainty_samples ' } ,
2025-08-10 18:32:47 +02:00
interval_width = 0.8 , uncertainty_samples = uncertainty_samples )
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
for regressor in regressor_columns :
if regressor in train_data . columns :
model . add_regressor ( regressor )
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
with warnings . catch_warnings ( ) :
warnings . simplefilter ( " ignore " )
2025-11-05 13:34:56 +01:00
try :
model . fit ( train_data )
except OSError as e :
# Log errno for "Operation not permitted" errors
if e . errno == errno . EPERM :
logger . error ( f " Permission denied during Prophet fit (errno= { e . errno } ): { e } " )
logger . error ( f " TMPDIR: { tmpdir } , exists: { os . path . exists ( tmpdir ) } , "
f " writable: { os . access ( tmpdir , os . W_OK ) } " )
raise
2025-07-28 19:28:39 +02:00
# Predict on validation set
future_df = model . make_future_dataframe ( periods = 0 )
for regressor in regressor_columns :
if regressor in df . columns :
future_df [ regressor ] = df [ regressor ] . values [ : len ( future_df ) ]
forecast = model . predict ( future_df )
val_predictions = forecast [ ' yhat ' ] . iloc [ train_idx [ - 1 ] + 1 : train_idx [ - 1 ] + 1 + len ( val_data ) ]
val_actual = val_data [ ' y ' ] . values
# Calculate MAPE with improved handling for low values
if len ( val_predictions ) > 0 and len ( val_actual ) > 0 :
# Use MAE for very low sales values to avoid MAPE issues
if val_actual . mean ( ) < 1 :
mae = np . mean ( np . abs ( val_actual - val_predictions . values ) )
# Convert MAE to percentage-like metric
mape_like = ( mae / max ( val_actual . mean ( ) , 0.1 ) ) * 100
else :
non_zero_mask = val_actual > 0.1 # Use threshold instead of zero
if np . sum ( non_zero_mask ) > 0 :
mape = np . mean ( np . abs ( ( val_actual [ non_zero_mask ] - val_predictions . values [ non_zero_mask ] ) / val_actual [ non_zero_mask ] ) ) * 100
mape_like = min ( mape , 200 ) # Cap at 200%
else :
mape_like = 100
if not np . isnan ( mape_like ) and not np . isinf ( mape_like ) :
cv_scores . append ( mape_like )
except Exception as fold_error :
2025-08-14 16:47:34 +02:00
logger . debug ( f " Fold failed for { inventory_product_id } trial { trial . number } : { str ( fold_error ) } " )
2025-07-28 19:28:39 +02:00
continue
return np . mean ( cv_scores ) if len ( cv_scores ) > 0 else 100.0
except Exception as trial_error :
2025-08-14 16:47:34 +02:00
logger . debug ( f " Trial { trial . number } failed for { inventory_product_id } : { str ( trial_error ) } " )
2025-07-28 19:28:39 +02:00
return 100.0
# Run optimization with product-specific seed
study = optuna . create_study (
2025-10-09 14:11:02 +02:00
direction = ' minimize ' ,
sampler = optuna . samplers . TPESampler ( seed = product_seed )
2025-07-28 19:28:39 +02:00
)
2025-11-05 14:34:53 +00:00
# ✅ FIX: Run blocking study.optimize() in thread pool to avoid blocking event loop
import asyncio
await asyncio . to_thread (
study . optimize ,
objective ,
n_trials = n_trials ,
timeout = const . OPTUNA_TIMEOUT_SECONDS ,
show_progress_bar = False
)
2025-07-28 19:28:39 +02:00
# Return best parameters
best_params = study . best_params
best_score = study . best_value
2025-08-14 16:47:34 +02:00
logger . info ( f " Optimization completed for { inventory_product_id } . Best score: { best_score : .2f } %. "
2025-07-28 19:28:39 +02:00
f " Parameters: { best_params } " )
2025-11-05 13:34:56 +01:00
# ✅ FIX: Log uncertainty sampling configuration for debugging confidence intervals with explicit int casting
uncertainty_samples = int ( best_params . get ( ' uncertainty_samples ' , 500 ) )
2025-08-14 16:47:34 +02:00
logger . info ( f " Prophet model will use { uncertainty_samples } uncertainty samples for { inventory_product_id } "
2025-08-10 18:32:47 +02:00
f " (category: { product_category } , zero_ratio: { zero_ratio : .2f } ) " )
2025-07-28 19:28:39 +02:00
return best_params
2025-08-14 16:47:34 +02:00
def _classify_product ( self , inventory_product_id : str , sales_data : pd . DataFrame ) - > str :
2025-07-28 19:28:39 +02:00
""" Automatically classify product for optimization strategy - improved for bakery data """
2025-08-14 16:47:34 +02:00
product_lower = str ( inventory_product_id ) . lower ( )
2025-07-28 19:28:39 +02:00
# Calculate sales statistics
total_sales = sales_data [ ' y ' ] . sum ( )
mean_sales = sales_data [ ' y ' ] . mean ( )
zero_ratio = ( sales_data [ ' y ' ] == 0 ) . sum ( ) / len ( sales_data )
non_zero_days = len ( sales_data [ sales_data [ ' y ' ] > 0 ] )
2025-08-14 16:47:34 +02:00
logger . info ( f " Product classification for { inventory_product_id } : total_sales= { total_sales : .1f } , "
2025-07-28 19:28:39 +02:00
f " mean_sales= { mean_sales : .2f } , zero_ratio= { zero_ratio : .2f } , non_zero_days= { non_zero_days } " )
# Improved classification logic for bakery products
# Consider both volume and consistency
# Check for truly intermittent demand (high zero ratio)
if zero_ratio > 0.8 or non_zero_days < 30 :
return ' intermittent '
# High volume products (consistent daily sales)
if any ( pattern in product_lower for pattern in [ ' cafe ' , ' pan ' , ' bread ' , ' coffee ' ] ) :
# Even if absolute volume is low, these are core products
return ' high_volume ' if zero_ratio < 0.3 else ' medium_volume '
# Volume-based classification for other products
if mean_sales > = 10 and zero_ratio < 0.4 :
return ' high_volume '
elif mean_sales > = 5 and zero_ratio < 0.6 :
return ' medium_volume '
elif mean_sales > = 2 and zero_ratio < 0.7 :
return ' low_volume '
else :
return ' intermittent '
def _create_optimized_prophet_model ( self , optimized_params : Dict [ str , Any ] , regressor_columns : List [ str ] ) - > Prophet :
2025-08-10 18:32:47 +02:00
""" Create Prophet model with optimized parameters and adaptive uncertainty sampling """
2025-07-28 19:28:39 +02:00
holidays = self . _get_spanish_holidays ( )
2025-11-05 13:34:56 +01:00
# Determine uncertainty samples based on data characteristics with explicit int casting
uncertainty_samples = int ( optimized_params . get ( ' uncertainty_samples ' , 500 ) ) if optimized_params . get ( ' uncertainty_samples ' ) is not None else 500
# If uncertainty_samples is 0, we're in fallback mode (no cmdstan)
if uncertainty_samples == 0 :
logger . info ( " Creating Prophet model without uncertainty sampling (fallback mode) " )
model = Prophet (
holidays = holidays if not holidays . empty else None ,
daily_seasonality = optimized_params . get ( ' daily_seasonality ' , True ) ,
weekly_seasonality = optimized_params . get ( ' weekly_seasonality ' , True ) ,
yearly_seasonality = optimized_params . get ( ' yearly_seasonality ' , True ) ,
seasonality_mode = optimized_params . get ( ' seasonality_mode ' , ' additive ' ) ,
changepoint_prior_scale = float ( optimized_params . get ( ' changepoint_prior_scale ' , 0.05 ) ) ,
seasonality_prior_scale = float ( optimized_params . get ( ' seasonality_prior_scale ' , 10.0 ) ) ,
holidays_prior_scale = float ( optimized_params . get ( ' holidays_prior_scale ' , 10.0 ) ) ,
changepoint_range = float ( optimized_params . get ( ' changepoint_range ' , 0.8 ) ) ,
interval_width = 0.8 ,
mcmc_samples = 0 ,
uncertainty_samples = 1 # Minimum value to avoid errors
)
else :
model = Prophet (
holidays = holidays if not holidays . empty else None ,
daily_seasonality = optimized_params . get ( ' daily_seasonality ' , True ) ,
weekly_seasonality = optimized_params . get ( ' weekly_seasonality ' , True ) ,
yearly_seasonality = optimized_params . get ( ' yearly_seasonality ' , True ) ,
seasonality_mode = optimized_params . get ( ' seasonality_mode ' , ' additive ' ) ,
changepoint_prior_scale = float ( optimized_params . get ( ' changepoint_prior_scale ' , 0.05 ) ) ,
seasonality_prior_scale = float ( optimized_params . get ( ' seasonality_prior_scale ' , 10.0 ) ) ,
holidays_prior_scale = float ( optimized_params . get ( ' holidays_prior_scale ' , 10.0 ) ) ,
changepoint_range = float ( optimized_params . get ( ' changepoint_range ' , 0.8 ) ) ,
interval_width = 0.8 ,
mcmc_samples = 0 ,
uncertainty_samples = uncertainty_samples
)
2025-07-28 19:28:39 +02:00
return model
# All the existing methods remain the same, just with enhanced metrics
async def _calculate_training_metrics ( self ,
model : Prophet ,
training_data : pd . DataFrame ,
optimized_params : Dict [ str , Any ] = None ) - > Dict [ str , float ] :
""" Calculate training metrics with optimization info and improved MAPE handling """
try :
# Generate in-sample predictions
forecast = model . predict ( training_data [ [ ' ds ' ] + [ col for col in training_data . columns if col not in [ ' ds ' , ' y ' ] ] ] )
# Calculate metrics
y_true = training_data [ ' y ' ] . values
y_pred = forecast [ ' yhat ' ] . values
# Basic metrics
mae = mean_absolute_error ( y_true , y_pred )
mse = mean_squared_error ( y_true , y_pred )
rmse = np . sqrt ( mse )
# Improved MAPE calculation for bakery data
mean_actual = y_true . mean ( )
median_actual = np . median ( y_true [ y_true > 0 ] ) if np . any ( y_true > 0 ) else 1.0
# Use different strategies based on sales volume
if mean_actual < 2.0 :
# For very low volume products, use normalized MAE
normalized_mae = mae / max ( median_actual , 1.0 )
mape = min ( normalized_mae * 100 , 200 ) # Cap at 200%
logger . info ( f " Using normalized MAE for low-volume product (mean= { mean_actual : .2f } ) " )
elif mean_actual < 5.0 :
# For low-medium volume, use modified MAPE with higher threshold
threshold = 1.0
valid_mask = y_true > = threshold
if np . sum ( valid_mask ) == 0 :
mape = 150.0 # High but not extreme
else :
mape_values = np . abs ( ( y_true [ valid_mask ] - y_pred [ valid_mask ] ) / y_true [ valid_mask ] )
mape = np . median ( mape_values ) * 100 # Use median instead of mean to reduce outlier impact
mape = min ( mape , 150 ) # Cap at reasonable level
else :
# Standard MAPE for higher volume products
threshold = 0.5
valid_mask = y_true > threshold
if np . sum ( valid_mask ) == 0 :
mape = 100.0
else :
mape_values = np . abs ( ( y_true [ valid_mask ] - y_pred [ valid_mask ] ) / y_true [ valid_mask ] )
mape = np . mean ( mape_values ) * 100
# Cap MAPE at reasonable maximum
if math . isinf ( mape ) or math . isnan ( mape ) or mape > 200 :
mape = min ( 200.0 , ( mae / max ( mean_actual , 1.0 ) ) * 100 )
# R-squared
ss_res = np . sum ( ( y_true - y_pred ) * * 2 )
ss_tot = np . sum ( ( y_true - np . mean ( y_true ) ) * * 2 )
r2 = 1 - ( ss_res / ss_tot ) if ss_tot != 0 else 0.0
# Calculate realistic improvement estimate based on actual product performance
# Use more granular categories and realistic baselines
total_sales = training_data [ ' y ' ] . sum ( )
zero_ratio = ( training_data [ ' y ' ] == 0 ) . sum ( ) / len ( training_data )
mean_sales = training_data [ ' y ' ] . mean ( )
non_zero_days = len ( training_data [ training_data [ ' y ' ] > 0 ] )
# More nuanced categorization
if zero_ratio > 0.8 or non_zero_days < 30 :
category = ' very_sparse '
baseline_mape = 80.0
elif zero_ratio > 0.6 :
category = ' sparse '
baseline_mape = 60.0
elif mean_sales > = 10 and zero_ratio < 0.3 :
category = ' high_volume '
baseline_mape = 25.0
elif mean_sales > = 5 and zero_ratio < 0.5 :
category = ' medium_volume '
baseline_mape = 35.0
else :
category = ' low_volume '
baseline_mape = 45.0
# Calculate improvement - be more conservative
if mape < baseline_mape * 0.8 : # Only claim improvement if significant
improvement_pct = ( baseline_mape - mape ) / baseline_mape * 100
else :
improvement_pct = 0 # No meaningful improvement
# Quality score based on data characteristics
quality_score = max ( 0.1 , min ( 1.0 , ( 1 - zero_ratio ) * ( non_zero_days / len ( training_data ) ) ) )
# Enhanced metrics with optimization info
metrics = {
" mae " : round ( mae , 2 ) ,
" mse " : round ( mse , 2 ) ,
" rmse " : round ( rmse , 2 ) ,
" mape " : round ( mape , 2 ) ,
" r2 " : round ( r2 , 3 ) ,
" optimized " : True ,
" optimized_mape " : round ( mape , 2 ) ,
" baseline_mape_estimate " : round ( baseline_mape , 2 ) ,
" improvement_estimated " : round ( improvement_pct , 1 ) ,
" product_category " : category ,
" data_quality_score " : round ( quality_score , 2 ) ,
" mean_sales_volume " : round ( mean_sales , 2 ) ,
" sales_consistency " : round ( non_zero_days / len ( training_data ) , 2 ) ,
" total_demand " : round ( total_sales , 1 )
}
logger . info ( f " Training metrics calculated: MAPE= { mape : .1f } %, "
f " Category= { category } , Improvement= { improvement_pct : .1f } % " )
return metrics
except Exception as e :
logger . error ( f " Error calculating training metrics: { str ( e ) } " )
return {
" mae " : 0.0 , " mse " : 0.0 , " rmse " : 0.0 , " mape " : 100.0 , " r2 " : 0.0 ,
" optimized " : False , " improvement_estimated " : 0.0
}
2025-11-05 16:41:53 +01:00
async def _store_model ( self ,
tenant_id : str ,
inventory_product_id : str ,
model : Prophet ,
2025-07-28 19:28:39 +02:00
model_id : str ,
training_data : pd . DataFrame ,
regressor_columns : List [ str ] ,
optimized_params : Dict [ str , Any ] = None ,
2025-11-05 16:41:53 +01:00
training_metrics : Dict [ str , Any ] = None ,
session = None ) - > str :
2025-07-28 19:28:39 +02:00
""" Store model with database integration """
# Create model directory
model_dir = Path ( settings . MODEL_STORAGE_PATH ) / tenant_id
model_dir . mkdir ( parents = True , exist_ok = True )
# Store model file
model_path = model_dir / f " { model_id } .pkl "
joblib . dump ( model , model_path )
2025-10-09 14:11:02 +02:00
# Calculate checksum for model file integrity
checksummed_file = ChecksummedFile ( str ( model_path ) )
model_checksum = checksummed_file . calculate_and_save_checksum ( )
# Enhanced metadata with checksum
2025-07-28 19:28:39 +02:00
metadata = {
" model_id " : model_id ,
" tenant_id " : tenant_id ,
2025-08-14 16:47:34 +02:00
" inventory_product_id " : inventory_product_id ,
2025-07-28 19:28:39 +02:00
" regressor_columns " : regressor_columns ,
" training_samples " : len ( training_data ) ,
" data_period " : {
2025-11-05 13:34:56 +01:00
" start_date " : pd . Timestamp ( training_data [ ' ds ' ] . min ( ) ) . isoformat ( ) ,
" end_date " : pd . Timestamp ( training_data [ ' ds ' ] . max ( ) ) . isoformat ( )
2025-07-28 19:28:39 +02:00
} ,
" optimized " : True ,
" optimized_parameters " : optimized_params or { } ,
" created_at " : datetime . now ( ) . isoformat ( ) ,
" model_type " : " prophet_optimized " ,
2025-10-09 14:11:02 +02:00
" file_path " : str ( model_path ) ,
" checksum " : model_checksum ,
" checksum_algorithm " : " sha256 "
2025-07-28 19:28:39 +02:00
}
2025-10-09 14:11:02 +02:00
2025-07-28 19:28:39 +02:00
metadata_path = model_path . with_suffix ( ' .json ' )
with open ( metadata_path , ' w ' ) as f :
json . dump ( metadata , f , indent = 2 , default = str )
# Store in memory
2025-08-14 16:47:34 +02:00
model_key = f " { tenant_id } : { inventory_product_id } "
2025-07-28 19:28:39 +02:00
self . models [ model_key ] = model
self . model_metadata [ model_key ] = metadata
2025-11-05 16:41:53 +01:00
# 🆕 NEW: Store in database using session (parent or new)
use_parent_session = session is not None
async def _store_in_db ( db_session ) :
""" Inner function to store model in database """
# Deactivate previous models for this product
await self . _deactivate_previous_models_with_session ( db_session , tenant_id , inventory_product_id )
# Helper to ensure hyperparameters are JSON serializable
def _serialize_hyperparameters ( params ) :
if not params :
return { }
safe_params = { }
for k , v in params . items ( ) :
try :
if isinstance ( v , ( int , float , str , bool , type ( None ) ) ) :
safe_params [ k ] = v
elif hasattr ( v , ' item ' ) : # numpy scalars
safe_params [ k ] = v . item ( )
elif isinstance ( v , ( list , tuple ) ) :
safe_params [ k ] = [ x . item ( ) if hasattr ( x , ' item ' ) else x for x in v ]
else :
safe_params [ k ] = float ( v ) if isinstance ( v , ( np . integer , np . floating ) ) else str ( v )
except :
safe_params [ k ] = str ( v ) # fallback to string conversion
return safe_params
# Create new database record
db_model = TrainedModel (
id = model_id ,
tenant_id = tenant_id ,
inventory_product_id = inventory_product_id ,
model_type = " prophet_optimized " ,
job_id = model_id . split ( ' _ ' ) [ 0 ] , # Extract job_id from model_id
model_path = str ( model_path ) ,
metadata_path = str ( metadata_path ) ,
hyperparameters = _serialize_hyperparameters ( optimized_params or { } ) ,
features_used = [ str ( f ) for f in regressor_columns ] if regressor_columns else [ ] ,
is_active = True ,
is_production = True , # New models are production-ready
training_start_date = pd . Timestamp ( training_data [ ' ds ' ] . min ( ) ) . to_pydatetime ( ) . replace ( tzinfo = None ) ,
training_end_date = pd . Timestamp ( training_data [ ' ds ' ] . max ( ) ) . to_pydatetime ( ) . replace ( tzinfo = None ) ,
training_samples = len ( training_data )
)
# Add training metrics if available
if training_metrics :
db_model . mape = float ( training_metrics . get ( ' mape ' ) ) if training_metrics . get ( ' mape ' ) is not None else None
db_model . mae = float ( training_metrics . get ( ' mae ' ) ) if training_metrics . get ( ' mae ' ) is not None else None
db_model . rmse = float ( training_metrics . get ( ' rmse ' ) ) if training_metrics . get ( ' rmse ' ) is not None else None
db_model . r2_score = float ( training_metrics . get ( ' r2 ' ) ) if training_metrics . get ( ' r2 ' ) is not None else None
db_model . data_quality_score = float ( training_metrics . get ( ' data_quality_score ' ) ) if training_metrics . get ( ' data_quality_score ' ) is not None else None
db_session . add ( db_model )
2025-11-05 19:26:52 +01:00
# Only commit if using a new session, not if using parent session
# Parent session commits will be handled by the calling method to prevent conflicts
if not use_parent_session :
await db_session . commit ( )
logger . info ( f " Model { model_id } stored in database successfully " )
else :
logger . debug ( f " Added model { model_id } to parent session (commit deferred) " )
2025-11-05 16:41:53 +01:00
2025-08-08 09:08:41 +02:00
try :
2025-11-05 16:41:53 +01:00
# ✅ FIX: Use parent session if provided, otherwise create new one
2025-11-05 19:26:52 +01:00
# The parent session handles commits, so child sessions shouldn't commit
2025-11-05 16:41:53 +01:00
if use_parent_session :
logger . debug ( f " Using parent session for storing model { model_id } " )
await _store_in_db ( session )
else :
logger . debug ( f " Creating new session for storing model { model_id } " )
async with self . database_manager . get_session ( ) as new_session :
await _store_in_db ( new_session )
2025-11-05 19:26:52 +01:00
# Commit only when using our own session (not parent)
await new_session . commit ( ) # This is safe since new_session is dedicated
logger . info ( f " Model { model_id } stored in database successfully " )
2025-11-05 13:34:56 +01:00
2025-08-08 09:08:41 +02:00
except Exception as e :
logger . error ( f " Failed to store model in database: { str ( e ) } " )
# Continue execution - file storage succeeded
2025-07-28 19:28:39 +02:00
logger . info ( f " Optimized model stored at: { model_path } " )
return str ( model_path )
2025-08-14 16:47:34 +02:00
async def _deactivate_previous_models_with_session ( self , db_session , tenant_id : str , inventory_product_id : str ) :
2025-08-08 09:08:41 +02:00
""" Deactivate previous models for the same product using provided session """
try :
# ✅ FIX: Wrap SQL string with text() for SQLAlchemy 2.0
query = text ( """
UPDATE trained_models
SET is_active = false , is_production = false
2025-08-14 16:47:34 +02:00
WHERE tenant_id = : tenant_id AND inventory_product_id = : inventory_product_id
2025-08-08 09:08:41 +02:00
""" )
await db_session . execute ( query , {
" tenant_id " : tenant_id ,
2025-08-14 16:47:34 +02:00
" inventory_product_id " : inventory_product_id
2025-08-08 09:08:41 +02:00
} )
# Note: Don't commit here, let the calling method handle the transaction
2025-08-14 16:47:34 +02:00
logger . info ( f " Successfully deactivated previous models for { inventory_product_id } " )
2025-08-08 09:08:41 +02:00
except Exception as e :
logger . error ( f " Failed to deactivate previous models: { str ( e ) } " )
raise
2025-07-28 19:28:39 +02:00
2025-10-09 14:11:02 +02:00
async def generate_forecast ( self ,
2025-07-19 16:59:37 +02:00
model_path : str ,
future_dates : pd . DataFrame ,
regressor_columns : List [ str ] ) - > pd . DataFrame :
2025-10-09 14:11:02 +02:00
""" Generate forecast using stored model with checksum verification """
2025-07-19 16:59:37 +02:00
try :
2025-10-09 14:11:02 +02:00
# Verify model file integrity before loading
checksummed_file = ChecksummedFile ( model_path )
if not checksummed_file . load_and_verify_checksum ( ) :
logger . warning ( f " Checksum verification failed for model: { model_path } " )
# Still load the model but log warning
# In production, you might want to raise an exception instead
2025-07-19 16:59:37 +02:00
model = joblib . load ( model_path )
2025-10-09 14:11:02 +02:00
2025-07-19 16:59:37 +02:00
for regressor in regressor_columns :
if regressor not in future_dates . columns :
logger . warning ( f " Missing regressor { regressor } , filling with median " )
2025-07-28 19:28:39 +02:00
future_dates [ regressor ] = 0
2025-10-09 14:11:02 +02:00
2025-07-19 16:59:37 +02:00
forecast = model . predict ( future_dates )
return forecast
2025-10-09 14:11:02 +02:00
2025-07-19 16:59:37 +02:00
except Exception as e :
logger . error ( f " Failed to generate forecast: { str ( e ) } " )
raise
2025-08-14 16:47:34 +02:00
async def _validate_training_data ( self , df : pd . DataFrame , inventory_product_id : str ) :
2025-07-28 19:28:39 +02:00
""" Validate training data quality (unchanged) """
2025-07-19 16:59:37 +02:00
if df . empty :
2025-08-14 16:47:34 +02:00
raise ValueError ( f " No training data available for { inventory_product_id } " )
2025-07-19 16:59:37 +02:00
if len ( df ) < settings . MIN_TRAINING_DATA_DAYS :
raise ValueError (
2025-08-14 16:47:34 +02:00
f " Insufficient training data for { inventory_product_id } : "
2025-07-19 16:59:37 +02:00
f " { len ( df ) } days, minimum required: { settings . MIN_TRAINING_DATA_DAYS } "
)
required_columns = [ ' ds ' , ' y ' ]
missing_columns = [ col for col in required_columns if col not in df . columns ]
if missing_columns :
raise ValueError ( f " Missing required columns: { missing_columns } " )
if df [ ' ds ' ] . isna ( ) . any ( ) :
raise ValueError ( " Invalid dates found in training data " )
if df [ ' y ' ] . isna ( ) . all ( ) :
raise ValueError ( " No valid target values found " )
async def _prepare_prophet_data ( self , df : pd . DataFrame ) - > pd . DataFrame :
2025-07-28 19:28:39 +02:00
""" Prepare data for Prophet training with timezone handling """
2025-07-19 16:59:37 +02:00
prophet_data = df . copy ( )
2025-10-09 14:11:02 +02:00
2025-07-28 19:28:39 +02:00
if ' ds ' not in prophet_data . columns :
raise ValueError ( " Missing ' ds ' column in training data " )
if ' y ' not in prophet_data . columns :
raise ValueError ( " Missing ' y ' column in training data " )
2025-10-09 14:11:02 +02:00
# Use timezone utility to prepare Prophet-compatible datetime
prophet_data = prepare_prophet_datetime ( prophet_data , ' ds ' )
2025-07-28 19:28:39 +02:00
# Sort by date and clean data
2025-07-19 16:59:37 +02:00
prophet_data = prophet_data . sort_values ( ' ds ' ) . reset_index ( drop = True )
2025-07-28 19:28:39 +02:00
prophet_data [ ' y ' ] = pd . to_numeric ( prophet_data [ ' y ' ] , errors = ' coerce ' )
prophet_data = prophet_data . dropna ( subset = [ ' y ' ] )
2025-10-09 14:11:02 +02:00
2025-07-28 19:28:39 +02:00
# Remove any duplicate dates (keep last occurrence)
prophet_data = prophet_data . drop_duplicates ( subset = [ ' ds ' ] , keep = ' last ' )
2025-10-09 14:11:02 +02:00
# Ensure y values are non-negative
2025-07-28 19:28:39 +02:00
prophet_data [ ' y ' ] = prophet_data [ ' y ' ] . clip ( lower = 0 )
2025-10-09 14:11:02 +02:00
2025-11-05 13:34:56 +01:00
logger . info ( f " Prepared Prophet data: { len ( prophet_data ) } rows, date range: { pd . Timestamp ( prophet_data [ ' ds ' ] . min ( ) ) } to { pd . Timestamp ( prophet_data [ ' ds ' ] . max ( ) ) } " )
2025-10-09 14:11:02 +02:00
2025-07-19 16:59:37 +02:00
return prophet_data
def _extract_regressor_columns ( self , df : pd . DataFrame ) - > List [ str ] :
2025-07-28 19:28:39 +02:00
""" Extract regressor columns (unchanged) """
2025-07-19 16:59:37 +02:00
excluded_columns = [ ' ds ' , ' y ' ]
regressor_columns = [ ]
for col in df . columns :
if col not in excluded_columns and df [ col ] . dtype in [ ' int64 ' , ' float64 ' ] :
regressor_columns . append ( col )
logger . info ( f " Identified regressor columns: { regressor_columns } " )
return regressor_columns
2025-11-05 13:34:56 +01:00
def _get_spanish_holidays ( self , region : str = None ) - > pd . DataFrame :
"""
Get Spanish holidays dynamically using holidays library .
Supports national and regional holidays , including dynamic Easter calculation .
Args :
region : Region code ( e . g . , ' MD ' for Madrid , ' PV ' for Basque Country )
Returns :
DataFrame with holiday dates and names
"""
2025-07-19 16:59:37 +02:00
try :
2025-11-05 13:34:56 +01:00
import holidays
2025-07-19 16:59:37 +02:00
holidays_list = [ ]
2025-11-05 13:34:56 +01:00
years = range ( 2020 , 2035 ) # Extended range for better coverage
# Get Spanish holidays for each year
for year in years :
# National holidays
spain_holidays = holidays . Spain ( years = year , prov = region )
for date , name in spain_holidays . items ( ) :
holidays_list . append ( {
' holiday ' : self . _normalize_holiday_name ( name ) ,
' ds ' : pd . Timestamp ( date ) ,
' lower_window ' : 0 ,
' upper_window ' : 0 # Can be adjusted for multi-day holidays
} )
if holidays_list :
holidays_df = pd . DataFrame ( holidays_list )
# Remove duplicates (some holidays may repeat)
holidays_df = holidays_df . drop_duplicates ( subset = [ ' ds ' , ' holiday ' ] )
holidays_df = holidays_df . sort_values ( ' ds ' ) . reset_index ( drop = True )
logger . info ( f " Loaded { len ( holidays_df ) } Spanish holidays dynamically " ,
region = region or ' National ' ,
years = f " { min ( years ) } - { max ( years ) } " )
return holidays_df
else :
return pd . DataFrame ( )
except Exception as e :
logger . warning ( f " Could not load Spanish holidays dynamically: { str ( e ) } " )
# Fallback to minimal hardcoded holidays
return self . _get_fallback_holidays ( )
def _normalize_holiday_name ( self , name : str ) - > str :
""" Normalize holiday name to a consistent format for Prophet """
# Convert to lowercase and replace spaces with underscores
normalized = name . lower ( ) . replace ( ' ' , ' _ ' ) . replace ( " ' " , ' ' )
# Remove special characters
normalized = ' ' . join ( c for c in normalized if c . isalnum ( ) or c == ' _ ' )
return normalized
def _get_fallback_holidays ( self ) - > pd . DataFrame :
""" Fallback to basic hardcoded holidays if dynamic loading fails """
try :
holidays_list = [ ]
years = range ( 2020 , 2035 )
2025-07-19 16:59:37 +02:00
for year in years :
holidays_list . extend ( [
{ ' holiday ' : ' new_year ' , ' ds ' : f ' { year } -01-01 ' } ,
{ ' holiday ' : ' epiphany ' , ' ds ' : f ' { year } -01-06 ' } ,
2025-07-28 19:28:39 +02:00
{ ' holiday ' : ' labor_day ' , ' ds ' : f ' { year } -05-01 ' } ,
2025-07-19 16:59:37 +02:00
{ ' holiday ' : ' assumption ' , ' ds ' : f ' { year } -08-15 ' } ,
{ ' holiday ' : ' national_day ' , ' ds ' : f ' { year } -10-12 ' } ,
{ ' holiday ' : ' all_saints ' , ' ds ' : f ' { year } -11-01 ' } ,
2025-07-28 19:28:39 +02:00
{ ' holiday ' : ' constitution_day ' , ' ds ' : f ' { year } -12-06 ' } ,
{ ' holiday ' : ' immaculate_conception ' , ' ds ' : f ' { year } -12-08 ' } ,
{ ' holiday ' : ' christmas ' , ' ds ' : f ' { year } -12-25 ' }
2025-07-19 16:59:37 +02:00
] )
2025-11-05 13:34:56 +01:00
holidays_df = pd . DataFrame ( holidays_list )
holidays_df [ ' ds ' ] = pd . to_datetime ( holidays_df [ ' ds ' ] )
return holidays_df
2025-07-19 16:59:37 +02:00
except Exception as e :
2025-11-05 13:34:56 +01:00
logger . error ( f " Fallback holidays failed: { e } " )
2025-07-28 19:28:39 +02:00
return pd . DataFrame ( )