""" Shared Data Processor for Bakery Forecasting Provides feature engineering capabilities for both training and prediction """ import pandas as pd import numpy as np from typing import Dict, List, Any, Optional from datetime import datetime import structlog import holidays from shared.ml.enhanced_features import AdvancedFeatureEngineer logger = structlog.get_logger() class EnhancedBakeryDataProcessor: """ Shared data processor for bakery forecasting. Focuses on prediction feature preparation without training-specific dependencies. """ def __init__(self, region: str = 'MD'): """ Initialize the data processor. Args: region: Spanish region code for holidays (MD=Madrid, PV=Basque, etc.) """ self.scalers = {} self.feature_engineer = AdvancedFeatureEngineer() self.region = region self.spain_holidays = holidays.Spain(prov=region) def get_scalers(self) -> Dict[str, Any]: """Return the scalers/normalization parameters for use during prediction""" return self.scalers.copy() @staticmethod def _extract_numeric_from_dict(value: Any) -> Optional[float]: """ Robust extraction of numeric values from complex data structures. """ if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) if isinstance(value, dict): for key in ['value', 'data', 'result', 'amount', 'count', 'number', 'val']: if key in value: extracted = value[key] if isinstance(extracted, dict): return EnhancedBakeryDataProcessor._extract_numeric_from_dict(extracted) elif isinstance(extracted, (int, float)) and not isinstance(extracted, bool): return float(extracted) for v in value.values(): if isinstance(v, (int, float)) and not isinstance(v, bool): return float(v) elif isinstance(v, dict): result = EnhancedBakeryDataProcessor._extract_numeric_from_dict(v) if result is not None: return result if isinstance(value, str): try: return float(value) except (ValueError, TypeError): pass return None async def prepare_prediction_features(self, future_dates: pd.DatetimeIndex, weather_forecast: pd.DataFrame = None, traffic_forecast: pd.DataFrame = None, poi_features: Dict[str, Any] = None, historical_data: pd.DataFrame = None) -> pd.DataFrame: """ Create features for future predictions. Args: future_dates: Future dates to predict weather_forecast: Weather forecast data traffic_forecast: Traffic forecast data (optional, not commonly forecasted) poi_features: POI features (location-based, static) historical_data: Historical data for creating lagged and rolling features Returns: DataFrame with features for prediction """ try: # Create base future dataframe future_df = pd.DataFrame({'ds': future_dates}) # Add temporal features future_df = self._add_temporal_features( future_df.rename(columns={'ds': 'date'}) ).rename(columns={'date': 'ds'}) # Add weather features if weather_forecast is not None and not weather_forecast.empty: weather_features = weather_forecast.copy() if 'date' in weather_features.columns: weather_features = weather_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(weather_features, on='ds', how='left') # Add traffic features if traffic_forecast is not None and not traffic_forecast.empty: traffic_features = traffic_forecast.copy() if 'date' in traffic_features.columns: traffic_features = traffic_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(traffic_features, on='ds', how='left') # Engineer basic features future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'})) # Add advanced features if historical data is provided if historical_data is not None and not historical_data.empty: combined_df = pd.concat([ historical_data.rename(columns={'ds': 'date'}), future_df ], ignore_index=True).sort_values('date') combined_df = self._add_advanced_features(combined_df) future_df = combined_df[combined_df['date'].isin(future_df['date'])].copy() else: logger.warning("No historical data provided, lagged features will be NaN") future_df = self._add_advanced_features(future_df) # Add POI features (static, location-based) if poi_features: future_df = self._add_poi_features(future_df, poi_features) future_df = future_df.rename(columns={'date': 'ds'}) # Handle missing values future_df = self._handle_missing_values_future(future_df) return future_df except Exception as e: logger.error("Error creating prediction features", error=str(e)) return pd.DataFrame({'ds': future_dates}) def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame: """Add comprehensive temporal features""" df = df.copy() if 'date' not in df.columns: raise ValueError("DataFrame must have a 'date' column") df['date'] = pd.to_datetime(df['date']) # Basic temporal features df['day_of_week'] = df['date'].dt.dayofweek df['day_of_month'] = df['date'].dt.day df['month'] = df['date'].dt.month df['quarter'] = df['date'].dt.quarter df['week_of_year'] = df['date'].dt.isocalendar().week # Bakery-specific features df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) df['is_monday'] = (df['day_of_week'] == 0).astype(int) df['is_friday'] = (df['day_of_week'] == 4).astype(int) # Season mapping df['season'] = df['month'].apply(self._get_season) df['is_summer'] = (df['season'] == 3).astype(int) df['is_winter'] = (df['season'] == 1).astype(int) # Holiday indicators df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int) df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int) df['is_month_start'] = (df['day_of_month'] <= 3).astype(int) df['is_month_end'] = (df['day_of_month'] >= 28).astype(int) # Payday patterns df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int) return df def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame: """Engineer additional features""" df = df.copy() # Weather-based features if 'temperature' in df.columns: df['temperature'] = pd.to_numeric(df['temperature'], errors='coerce').fillna(15.0) df['temp_squared'] = df['temperature'] ** 2 df['is_hot_day'] = (df['temperature'] > 25).astype(int) df['is_cold_day'] = (df['temperature'] < 10).astype(int) df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int) df['temp_category'] = pd.cut(df['temperature'], bins=[-np.inf, 5, 15, 25, np.inf], labels=[0, 1, 2, 3]).astype(int) if 'precipitation' in df.columns: df['precipitation'] = pd.to_numeric(df['precipitation'], errors='coerce').fillna(0.0) df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int) df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int) df['rain_intensity'] = pd.cut(df['precipitation'], bins=[-0.1, 0, 2, 10, np.inf], labels=[0, 1, 2, 3]).astype(int) # Traffic-based features if 'traffic_volume' in df.columns: df['traffic_volume'] = pd.to_numeric(df['traffic_volume'], errors='coerce').fillna(100.0) q75 = df['traffic_volume'].quantile(0.75) q25 = df['traffic_volume'].quantile(0.25) df['high_traffic'] = (df['traffic_volume'] > q75).astype(int) df['low_traffic'] = (df['traffic_volume'] < q25).astype(int) traffic_std = df['traffic_volume'].std() traffic_mean = df['traffic_volume'].mean() if traffic_std > 0 and not pd.isna(traffic_std): df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std self.scalers['traffic_mean'] = float(traffic_mean) self.scalers['traffic_std'] = float(traffic_std) else: df['traffic_normalized'] = 0.0 self.scalers['traffic_mean'] = 100.0 self.scalers['traffic_std'] = 50.0 df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0) # Interaction features if 'is_weekend' in df.columns and 'temperature' in df.columns: df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature'] df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0) if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns: df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume'] if 'is_holiday' in df.columns and 'temperature' in df.columns: df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature'] if 'season' in df.columns and 'temperature' in df.columns: df['season_temp_interaction'] = df['season'] * df['temperature'] # Day-of-week specific features if 'day_of_week' in df.columns: df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int) df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int) # Month-specific features if 'month' in df.columns: df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int) df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int) # Special day: Payday if 'is_payday_period' in df.columns: df['is_payday'] = df['is_payday_period'] return df def _add_advanced_features(self, df: pd.DataFrame) -> pd.DataFrame: """Add advanced features using AdvancedFeatureEngineer""" df = df.copy() logger.info("Adding advanced features (lagged, rolling, cyclical, trends)", input_rows=len(df), input_columns=len(df.columns)) self.feature_engineer = AdvancedFeatureEngineer() df = self.feature_engineer.create_all_features( df, date_column='date', include_lags=True, include_rolling=True, include_interactions=True, include_cyclical=True ) df = self.feature_engineer.fill_na_values(df, strategy='forward_backward') created_features = self.feature_engineer.get_feature_columns() logger.info(f"Added {len(created_features)} advanced features") return df def _add_poi_features(self, df: pd.DataFrame, poi_features: Dict[str, Any]) -> pd.DataFrame: """Add POI features (static, location-based)""" if not poi_features: logger.warning("No POI features to add") return df logger.info(f"Adding {len(poi_features)} POI features to dataframe") for feature_name, feature_value in poi_features.items(): if isinstance(feature_value, bool): feature_value = 1 if feature_value else 0 df[feature_name] = feature_value return df def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame: """Handle missing values in future prediction data""" numeric_columns = df.select_dtypes(include=[np.number]).columns madrid_defaults = { 'temperature': 15.0, 'precipitation': 0.0, 'humidity': 60.0, 'wind_speed': 5.0, 'traffic_volume': 100.0, 'pedestrian_count': 50.0, 'pressure': 1013.0 } for col in numeric_columns: if df[col].isna().any(): default_value = 0 for key, value in madrid_defaults.items(): if key in col.lower(): default_value = value break df[col] = df[col].fillna(default_value) return df def _get_season(self, month: int) -> int: """Get season from month (1-4 for Winter, Spring, Summer, Autumn)""" if month in [12, 1, 2]: return 1 # Winter elif month in [3, 4, 5]: return 2 # Spring elif month in [6, 7, 8]: return 3 # Summer else: return 4 # Autumn def _is_spanish_holiday(self, date: datetime) -> bool: """Check if a date is a Spanish holiday""" try: if isinstance(date, datetime): date = date.date() elif isinstance(date, pd.Timestamp): date = date.date() return date in self.spain_holidays except Exception as e: logger.warning(f"Error checking holiday status for {date}: {e}") month_day = (date.month, date.day) basic_holidays = [ (1, 1), (1, 6), (5, 1), (8, 15), (10, 12), (11, 1), (12, 6), (12, 8), (12, 25) ] return month_day in basic_holidays def _is_school_holiday(self, date: datetime) -> bool: """Check if a date is during school holidays in Spain""" try: from datetime import timedelta import holidays as hol if isinstance(date, datetime): check_date = date.date() elif isinstance(date, pd.Timestamp): check_date = date.date() else: check_date = date month = check_date.month day = check_date.day # Summer holidays (July 1 - August 31) if month in [7, 8]: return True # Christmas holidays (December 23 - January 7) if (month == 12 and day >= 23) or (month == 1 and day <= 7): return True # Easter/Spring break (Semana Santa) year = check_date.year spain_hol = hol.Spain(years=year, prov=self.region) for holiday_date, holiday_name in spain_hol.items(): if 'viernes santo' in holiday_name.lower() or 'easter' in holiday_name.lower(): easter_start = holiday_date - timedelta(days=7) easter_end = holiday_date + timedelta(days=7) if easter_start <= check_date <= easter_end: return True return False except Exception as e: logger.warning(f"Error checking school holiday for {date}: {e}") month = date.month if hasattr(date, 'month') else date.month day = date.day if hasattr(date, 'day') else date.day return (month in [7, 8] or (month == 12 and day >= 23) or (month == 1 and day <= 7) or (month == 4 and 1 <= day <= 15))