""" Shared Feature Calculator for Training and Prediction Services This module provides unified feature calculation logic to ensure consistency between model training and inference (prediction), preventing train/serve skew. Key principles: - Same lag calculation logic in training and prediction - Same rolling window statistics in training and prediction - Same trend feature calculations in training and prediction - Graceful handling of sparse/missing data with consistent fallbacks """ import pandas as pd import numpy as np from typing import Dict, List, Optional, Union, Tuple from datetime import datetime import structlog logger = structlog.get_logger() class HistoricalFeatureCalculator: """ Unified historical feature calculator for both training and prediction. This class ensures that features are calculated identically whether during model training or during inference, preventing train/serve skew. """ def __init__(self): """Initialize the feature calculator.""" self.feature_columns = [] def calculate_lag_features( self, sales_data: Union[pd.Series, pd.DataFrame], lag_days: List[int] = None, mode: str = 'training' ) -> Union[pd.DataFrame, Dict[str, float]]: """ Calculate lagged sales features consistently for training and prediction. Args: sales_data: Sales data as Series (prediction) or DataFrame (training) with 'quantity' column lag_days: List of lag periods (default: [1, 7, 14]) mode: 'training' returns DataFrame with lag columns, 'prediction' returns dict of features Returns: DataFrame with lag columns (training mode) or dict of lag features (prediction mode) """ if lag_days is None: lag_days = [1, 7, 14] if mode == 'training': return self._calculate_lag_features_training(sales_data, lag_days) else: return self._calculate_lag_features_prediction(sales_data, lag_days) def _calculate_lag_features_training( self, df: pd.DataFrame, lag_days: List[int] ) -> pd.DataFrame: """ Calculate lag features for training (operates on DataFrame). Args: df: DataFrame with 'quantity' column lag_days: List of lag periods Returns: DataFrame with added lag columns """ df = df.copy() # Calculate overall statistics for fallback (consistent with prediction) overall_mean = float(df['quantity'].mean()) if len(df) > 0 else 0.0 overall_std = float(df['quantity'].std()) if len(df) > 1 else 0.0 for lag in lag_days: col_name = f'lag_{lag}_day' # Use pandas shift df[col_name] = df['quantity'].shift(lag) # Fill NaN values using same logic as prediction mode # For missing lags, use cascading fallback: previous lag -> last value -> mean if lag == 1: # For lag_1, fill with last available or mean df[col_name] = df[col_name].fillna(df['quantity'].iloc[0] if len(df) > 0 else overall_mean) elif lag == 7: # For lag_7, fill with lag_1 if available, else last value, else mean mask = df[col_name].isna() if 'lag_1_day' in df.columns: df.loc[mask, col_name] = df.loc[mask, 'lag_1_day'] else: df.loc[mask, col_name] = df['quantity'].iloc[0] if len(df) > 0 else overall_mean elif lag == 14: # For lag_14, fill with lag_7 if available, else lag_1, else last value, else mean mask = df[col_name].isna() if 'lag_7_day' in df.columns: df.loc[mask, col_name] = df.loc[mask, 'lag_7_day'] elif 'lag_1_day' in df.columns: df.loc[mask, col_name] = df.loc[mask, 'lag_1_day'] else: df.loc[mask, col_name] = df['quantity'].iloc[0] if len(df) > 0 else overall_mean # Fill any remaining NaN with mean df[col_name] = df[col_name].fillna(overall_mean) self.feature_columns.append(col_name) logger.debug(f"Added {len(lag_days)} lagged features (training mode)", lags=lag_days) return df def _calculate_lag_features_prediction( self, historical_sales: pd.Series, lag_days: List[int] ) -> Dict[str, float]: """ Calculate lag features for prediction (operates on Series, returns dict). Args: historical_sales: Series of sales quantities indexed by date lag_days: List of lag periods Returns: Dictionary of lag features """ features = {} if len(historical_sales) == 0: # Return default values if no data for lag in lag_days: features[f'lag_{lag}_day'] = 0.0 return features # Calculate overall statistics for fallback overall_mean = float(historical_sales.mean()) overall_std = float(historical_sales.std()) if len(historical_sales) > 1 else 0.0 # Calculate lag_1_day if 1 in lag_days: if len(historical_sales) >= 1: features['lag_1_day'] = float(historical_sales.iloc[-1]) else: features['lag_1_day'] = overall_mean # Calculate lag_7_day if 7 in lag_days: if len(historical_sales) >= 7: features['lag_7_day'] = float(historical_sales.iloc[-7]) else: # Fallback to last value if insufficient data features['lag_7_day'] = float(historical_sales.iloc[-1]) if len(historical_sales) > 0 else overall_mean # Calculate lag_14_day if 14 in lag_days: if len(historical_sales) >= 14: features['lag_14_day'] = float(historical_sales.iloc[-14]) else: # Cascading fallback: lag_7 -> lag_1 -> last value -> mean if len(historical_sales) >= 7: features['lag_14_day'] = float(historical_sales.iloc[-7]) else: features['lag_14_day'] = float(historical_sales.iloc[-1]) if len(historical_sales) > 0 else overall_mean logger.debug("Calculated lag features (prediction mode)", features=features) return features def calculate_rolling_features( self, sales_data: Union[pd.Series, pd.DataFrame], windows: List[int] = None, statistics: List[str] = None, mode: str = 'training' ) -> Union[pd.DataFrame, Dict[str, float]]: """ Calculate rolling window statistics consistently for training and prediction. Args: sales_data: Sales data as Series (prediction) or DataFrame (training) with 'quantity' column windows: List of window sizes in days (default: [7, 14, 30]) statistics: List of statistics to calculate (default: ['mean', 'std', 'max', 'min']) mode: 'training' returns DataFrame, 'prediction' returns dict Returns: DataFrame with rolling columns (training mode) or dict of rolling features (prediction mode) """ if windows is None: windows = [7, 14, 30] if statistics is None: statistics = ['mean', 'std', 'max', 'min'] if mode == 'training': return self._calculate_rolling_features_training(sales_data, windows, statistics) else: return self._calculate_rolling_features_prediction(sales_data, windows, statistics) def _calculate_rolling_features_training( self, df: pd.DataFrame, windows: List[int], statistics: List[str] ) -> pd.DataFrame: """ Calculate rolling features for training (operates on DataFrame). Args: df: DataFrame with 'quantity' column windows: List of window sizes statistics: List of statistics to calculate Returns: DataFrame with added rolling columns """ df = df.copy() # Calculate overall statistics for fallback overall_mean = float(df['quantity'].mean()) if len(df) > 0 else 0.0 overall_std = float(df['quantity'].std()) if len(df) > 1 else 0.0 overall_max = float(df['quantity'].max()) if len(df) > 0 else 0.0 overall_min = float(df['quantity'].min()) if len(df) > 0 else 0.0 fallback_values = { 'mean': overall_mean, 'std': overall_std, 'max': overall_max, 'min': overall_min } for window in windows: for stat in statistics: col_name = f'rolling_{stat}_{window}d' # Calculate rolling statistic with full window required (consistent with prediction) # Use min_periods=window to match prediction behavior if stat == 'mean': df[col_name] = df['quantity'].rolling(window=window, min_periods=window).mean() elif stat == 'std': df[col_name] = df['quantity'].rolling(window=window, min_periods=window).std() elif stat == 'max': df[col_name] = df['quantity'].rolling(window=window, min_periods=window).max() elif stat == 'min': df[col_name] = df['quantity'].rolling(window=window, min_periods=window).min() # Fill NaN values using cascading fallback (consistent with prediction) # Use smaller window values if available, otherwise use overall statistics mask = df[col_name].isna() if window == 14 and f'rolling_{stat}_7d' in df.columns: # Use 7-day window for 14-day NaN df.loc[mask, col_name] = df.loc[mask, f'rolling_{stat}_7d'] elif window == 30 and f'rolling_{stat}_14d' in df.columns: # Use 14-day window for 30-day NaN df.loc[mask, col_name] = df.loc[mask, f'rolling_{stat}_14d'] elif window == 30 and f'rolling_{stat}_7d' in df.columns: # Use 7-day window for 30-day NaN if 14-day not available df.loc[mask, col_name] = df.loc[mask, f'rolling_{stat}_7d'] # Fill any remaining NaN with overall statistics df[col_name] = df[col_name].fillna(fallback_values[stat]) self.feature_columns.append(col_name) logger.debug(f"Added rolling features (training mode)", windows=windows, statistics=statistics) return df def _calculate_rolling_features_prediction( self, historical_sales: pd.Series, windows: List[int], statistics: List[str] ) -> Dict[str, float]: """ Calculate rolling features for prediction (operates on Series, returns dict). Args: historical_sales: Series of sales quantities indexed by date windows: List of window sizes statistics: List of statistics to calculate Returns: Dictionary of rolling features """ features = {} if len(historical_sales) == 0: # Return default values if no data for window in windows: for stat in statistics: features[f'rolling_{stat}_{window}d'] = 0.0 return features # Calculate overall statistics for fallback overall_mean = float(historical_sales.mean()) overall_std = float(historical_sales.std()) if len(historical_sales) > 1 else 0.0 overall_max = float(historical_sales.max()) overall_min = float(historical_sales.min()) fallback_values = { 'mean': overall_mean, 'std': overall_std, 'max': overall_max, 'min': overall_min } # Calculate for each window for window in windows: if len(historical_sales) >= window: # Have enough data for full window window_data = historical_sales.iloc[-window:] for stat in statistics: col_name = f'rolling_{stat}_{window}d' if stat == 'mean': features[col_name] = float(window_data.mean()) elif stat == 'std': features[col_name] = float(window_data.std()) if len(window_data) > 1 else 0.0 elif stat == 'max': features[col_name] = float(window_data.max()) elif stat == 'min': features[col_name] = float(window_data.min()) else: # Insufficient data - use cascading fallback for stat in statistics: col_name = f'rolling_{stat}_{window}d' # Try to use smaller window if available if window == 14 and f'rolling_{stat}_7d' in features: features[col_name] = features[f'rolling_{stat}_7d'] elif window == 30 and f'rolling_{stat}_14d' in features: features[col_name] = features[f'rolling_{stat}_14d'] elif window == 30 and f'rolling_{stat}_7d' in features: features[col_name] = features[f'rolling_{stat}_7d'] else: # Use overall statistics features[col_name] = fallback_values[stat] logger.debug("Calculated rolling features (prediction mode)", num_features=len(features)) return features def calculate_trend_features( self, sales_data: Union[pd.Series, pd.DataFrame], reference_date: Optional[datetime] = None, lag_features: Optional[Dict[str, float]] = None, rolling_features: Optional[Dict[str, float]] = None, mode: str = 'training' ) -> Union[pd.DataFrame, Dict[str, float]]: """ Calculate trend-based features consistently for training and prediction. Args: sales_data: Sales data as Series (prediction) or DataFrame (training) reference_date: Reference date for calculations (prediction mode) lag_features: Pre-calculated lag features (prediction mode) rolling_features: Pre-calculated rolling features (prediction mode) mode: 'training' returns DataFrame, 'prediction' returns dict Returns: DataFrame with trend columns (training mode) or dict of trend features (prediction mode) """ if mode == 'training': return self._calculate_trend_features_training(sales_data) else: return self._calculate_trend_features_prediction( sales_data, reference_date, lag_features, rolling_features ) def _calculate_trend_features_training( self, df: pd.DataFrame, date_column: str = 'date' ) -> pd.DataFrame: """ Calculate trend features for training (operates on DataFrame). Args: df: DataFrame with date and lag/rolling features date_column: Name of date column Returns: DataFrame with added trend columns """ df = df.copy() # Days since start df['days_since_start'] = (df[date_column] - df[date_column].min()).dt.days # Momentum (difference between lag_1 and lag_7) if 'lag_1_day' in df.columns and 'lag_7_day' in df.columns: df['momentum_1_7'] = df['lag_1_day'] - df['lag_7_day'] self.feature_columns.append('momentum_1_7') else: df['momentum_1_7'] = 0.0 self.feature_columns.append('momentum_1_7') # Trend (difference between 7-day and 30-day rolling means) if 'rolling_mean_7d' in df.columns and 'rolling_mean_30d' in df.columns: df['trend_7_30'] = df['rolling_mean_7d'] - df['rolling_mean_30d'] self.feature_columns.append('trend_7_30') else: df['trend_7_30'] = 0.0 self.feature_columns.append('trend_7_30') # Velocity (rate of change over week) if 'lag_1_day' in df.columns and 'lag_7_day' in df.columns: df['velocity_week'] = (df['lag_1_day'] - df['lag_7_day']) / 7.0 self.feature_columns.append('velocity_week') else: df['velocity_week'] = 0.0 self.feature_columns.append('velocity_week') self.feature_columns.append('days_since_start') logger.debug("Added trend features (training mode)") return df def _calculate_trend_features_prediction( self, historical_sales: pd.Series, reference_date: datetime, lag_features: Dict[str, float], rolling_features: Dict[str, float] ) -> Dict[str, float]: """ Calculate trend features for prediction (operates on Series, returns dict). Args: historical_sales: Series of sales quantities indexed by date reference_date: The date we're forecasting for lag_features: Pre-calculated lag features rolling_features: Pre-calculated rolling features Returns: Dictionary of trend features """ features = {} if len(historical_sales) == 0: return { 'days_since_start': 0, 'momentum_1_7': 0.0, 'trend_7_30': 0.0, 'velocity_week': 0.0 } # Days since first sale features['days_since_start'] = (reference_date - historical_sales.index[0]).days # Momentum (difference between lag_1 and lag_7) if 'lag_1_day' in lag_features and 'lag_7_day' in lag_features: if len(historical_sales) >= 7: features['momentum_1_7'] = lag_features['lag_1_day'] - lag_features['lag_7_day'] else: features['momentum_1_7'] = 0.0 # Insufficient data else: features['momentum_1_7'] = 0.0 # Trend (difference between 7-day and 30-day rolling means) if 'rolling_mean_7d' in rolling_features and 'rolling_mean_30d' in rolling_features: if len(historical_sales) >= 30: features['trend_7_30'] = rolling_features['rolling_mean_7d'] - rolling_features['rolling_mean_30d'] else: features['trend_7_30'] = 0.0 # Insufficient data else: features['trend_7_30'] = 0.0 # Velocity (rate of change over week) if 'lag_1_day' in lag_features and 'lag_7_day' in lag_features: if len(historical_sales) >= 7: recent_value = lag_features['lag_1_day'] past_value = lag_features['lag_7_day'] features['velocity_week'] = float((recent_value - past_value) / 7.0) else: features['velocity_week'] = 0.0 # Insufficient data else: features['velocity_week'] = 0.0 logger.debug("Calculated trend features (prediction mode)", features=features) return features def calculate_data_freshness_metrics( self, historical_sales: pd.Series, forecast_date: datetime ) -> Dict[str, Union[int, float]]: """ Calculate data freshness and availability metrics. This is used by prediction service to assess data quality and adjust confidence. Not used in training mode. Args: historical_sales: Series of sales quantities indexed by date forecast_date: The date we're forecasting for Returns: Dictionary with freshness metrics """ if len(historical_sales) == 0: return { 'days_since_last_sale': 999, # Very large number indicating no data 'historical_data_availability_score': 0.0 } last_available_date = historical_sales.index.max() days_since_last_sale = (forecast_date - last_available_date).days # Calculate data availability score (0-1 scale, 1 being recent data) max_considered_days = 180 # Consider data older than 6 months as very stale availability_score = max(0.0, 1.0 - (days_since_last_sale / max_considered_days)) return { 'days_since_last_sale': days_since_last_sale, 'historical_data_availability_score': availability_score } def calculate_all_features( self, sales_data: Union[pd.Series, pd.DataFrame], reference_date: Optional[datetime] = None, mode: str = 'training', date_column: str = 'date' ) -> Union[pd.DataFrame, Dict[str, float]]: """ Calculate all historical features in one call. Args: sales_data: Sales data as Series (prediction) or DataFrame (training) reference_date: Reference date for predictions (prediction mode only) mode: 'training' or 'prediction' date_column: Name of date column (training mode only) Returns: DataFrame with all features (training) or dict of all features (prediction) """ if mode == 'training': df = sales_data.copy() # Calculate lag features df = self.calculate_lag_features(df, mode='training') # Calculate rolling features df = self.calculate_rolling_features(df, mode='training') # Calculate trend features df = self.calculate_trend_features(df, mode='training') logger.info(f"Calculated all features (training mode)", feature_count=len(self.feature_columns)) return df else: # prediction mode if reference_date is None: raise ValueError("reference_date is required for prediction mode") features = {} # Calculate lag features lag_features = self.calculate_lag_features(sales_data, mode='prediction') features.update(lag_features) # Calculate rolling features rolling_features = self.calculate_rolling_features(sales_data, mode='prediction') features.update(rolling_features) # Calculate trend features trend_features = self.calculate_trend_features( sales_data, reference_date=reference_date, lag_features=lag_features, rolling_features=rolling_features, mode='prediction' ) features.update(trend_features) # Calculate data freshness metrics freshness_metrics = self.calculate_data_freshness_metrics(sales_data, reference_date) features.update(freshness_metrics) logger.info(f"Calculated all features (prediction mode)", feature_count=len(features)) return features