401 lines
16 KiB
Python
401 lines
16 KiB
Python
"""
|
|
Shared Data Processor for Bakery Forecasting
|
|
Provides feature engineering capabilities for both training and prediction
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
import structlog
|
|
import holidays
|
|
|
|
from shared.ml.enhanced_features import AdvancedFeatureEngineer
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class EnhancedBakeryDataProcessor:
|
|
"""
|
|
Shared data processor for bakery forecasting.
|
|
Focuses on prediction feature preparation without training-specific dependencies.
|
|
"""
|
|
|
|
def __init__(self, region: str = 'MD'):
|
|
"""
|
|
Initialize the data processor.
|
|
|
|
Args:
|
|
region: Spanish region code for holidays (MD=Madrid, PV=Basque, etc.)
|
|
"""
|
|
self.scalers = {}
|
|
self.feature_engineer = AdvancedFeatureEngineer()
|
|
self.region = region
|
|
self.spain_holidays = holidays.Spain(prov=region)
|
|
|
|
def get_scalers(self) -> Dict[str, Any]:
|
|
"""Return the scalers/normalization parameters for use during prediction"""
|
|
return self.scalers.copy()
|
|
|
|
@staticmethod
|
|
def _extract_numeric_from_dict(value: Any) -> Optional[float]:
|
|
"""
|
|
Robust extraction of numeric values from complex data structures.
|
|
"""
|
|
if isinstance(value, (int, float)) and not isinstance(value, bool):
|
|
return float(value)
|
|
|
|
if isinstance(value, dict):
|
|
for key in ['value', 'data', 'result', 'amount', 'count', 'number', 'val']:
|
|
if key in value:
|
|
extracted = value[key]
|
|
if isinstance(extracted, dict):
|
|
return EnhancedBakeryDataProcessor._extract_numeric_from_dict(extracted)
|
|
elif isinstance(extracted, (int, float)) and not isinstance(extracted, bool):
|
|
return float(extracted)
|
|
|
|
for v in value.values():
|
|
if isinstance(v, (int, float)) and not isinstance(v, bool):
|
|
return float(v)
|
|
elif isinstance(v, dict):
|
|
result = EnhancedBakeryDataProcessor._extract_numeric_from_dict(v)
|
|
if result is not None:
|
|
return result
|
|
|
|
if isinstance(value, str):
|
|
try:
|
|
return float(value)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return None
|
|
|
|
async def prepare_prediction_features(self,
|
|
future_dates: pd.DatetimeIndex,
|
|
weather_forecast: pd.DataFrame = None,
|
|
traffic_forecast: pd.DataFrame = None,
|
|
poi_features: Dict[str, Any] = None,
|
|
historical_data: pd.DataFrame = None) -> pd.DataFrame:
|
|
"""
|
|
Create features for future predictions.
|
|
|
|
Args:
|
|
future_dates: Future dates to predict
|
|
weather_forecast: Weather forecast data
|
|
traffic_forecast: Traffic forecast data (optional, not commonly forecasted)
|
|
poi_features: POI features (location-based, static)
|
|
historical_data: Historical data for creating lagged and rolling features
|
|
|
|
Returns:
|
|
DataFrame with features for prediction
|
|
"""
|
|
try:
|
|
# Create base future dataframe
|
|
future_df = pd.DataFrame({'ds': future_dates})
|
|
|
|
# Add temporal features
|
|
future_df = self._add_temporal_features(
|
|
future_df.rename(columns={'ds': 'date'})
|
|
).rename(columns={'date': 'ds'})
|
|
|
|
# Add weather features
|
|
if weather_forecast is not None and not weather_forecast.empty:
|
|
weather_features = weather_forecast.copy()
|
|
if 'date' in weather_features.columns:
|
|
weather_features = weather_features.rename(columns={'date': 'ds'})
|
|
|
|
future_df = future_df.merge(weather_features, on='ds', how='left')
|
|
|
|
# Add traffic features
|
|
if traffic_forecast is not None and not traffic_forecast.empty:
|
|
traffic_features = traffic_forecast.copy()
|
|
if 'date' in traffic_features.columns:
|
|
traffic_features = traffic_features.rename(columns={'date': 'ds'})
|
|
|
|
future_df = future_df.merge(traffic_features, on='ds', how='left')
|
|
|
|
# Engineer basic features
|
|
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
|
|
|
|
# Add advanced features if historical data is provided
|
|
if historical_data is not None and not historical_data.empty:
|
|
combined_df = pd.concat([
|
|
historical_data.rename(columns={'ds': 'date'}),
|
|
future_df
|
|
], ignore_index=True).sort_values('date')
|
|
|
|
combined_df = self._add_advanced_features(combined_df)
|
|
future_df = combined_df[combined_df['date'].isin(future_df['date'])].copy()
|
|
else:
|
|
logger.warning("No historical data provided, lagged features will be NaN")
|
|
future_df = self._add_advanced_features(future_df)
|
|
|
|
# Add POI features (static, location-based)
|
|
if poi_features:
|
|
future_df = self._add_poi_features(future_df, poi_features)
|
|
|
|
future_df = future_df.rename(columns={'date': 'ds'})
|
|
|
|
# Handle missing values
|
|
future_df = self._handle_missing_values_future(future_df)
|
|
|
|
return future_df
|
|
|
|
except Exception as e:
|
|
logger.error("Error creating prediction features", error=str(e))
|
|
return pd.DataFrame({'ds': future_dates})
|
|
|
|
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add comprehensive temporal features"""
|
|
df = df.copy()
|
|
|
|
if 'date' not in df.columns:
|
|
raise ValueError("DataFrame must have a 'date' column")
|
|
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
|
|
# Basic temporal features
|
|
df['day_of_week'] = df['date'].dt.dayofweek
|
|
df['day_of_month'] = df['date'].dt.day
|
|
df['month'] = df['date'].dt.month
|
|
df['quarter'] = df['date'].dt.quarter
|
|
df['week_of_year'] = df['date'].dt.isocalendar().week
|
|
|
|
# Bakery-specific features
|
|
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
|
|
df['is_monday'] = (df['day_of_week'] == 0).astype(int)
|
|
df['is_friday'] = (df['day_of_week'] == 4).astype(int)
|
|
|
|
# Season mapping
|
|
df['season'] = df['month'].apply(self._get_season)
|
|
df['is_summer'] = (df['season'] == 3).astype(int)
|
|
df['is_winter'] = (df['season'] == 1).astype(int)
|
|
|
|
# Holiday indicators
|
|
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
|
|
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
|
|
df['is_month_start'] = (df['day_of_month'] <= 3).astype(int)
|
|
df['is_month_end'] = (df['day_of_month'] >= 28).astype(int)
|
|
|
|
# Payday patterns
|
|
df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int)
|
|
|
|
return df
|
|
|
|
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Engineer additional features"""
|
|
df = df.copy()
|
|
|
|
# Weather-based features
|
|
if 'temperature' in df.columns:
|
|
df['temperature'] = pd.to_numeric(df['temperature'], errors='coerce').fillna(15.0)
|
|
df['temp_squared'] = df['temperature'] ** 2
|
|
df['is_hot_day'] = (df['temperature'] > 25).astype(int)
|
|
df['is_cold_day'] = (df['temperature'] < 10).astype(int)
|
|
df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int)
|
|
df['temp_category'] = pd.cut(df['temperature'],
|
|
bins=[-np.inf, 5, 15, 25, np.inf],
|
|
labels=[0, 1, 2, 3]).astype(int)
|
|
|
|
if 'precipitation' in df.columns:
|
|
df['precipitation'] = pd.to_numeric(df['precipitation'], errors='coerce').fillna(0.0)
|
|
df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int)
|
|
df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int)
|
|
df['rain_intensity'] = pd.cut(df['precipitation'],
|
|
bins=[-0.1, 0, 2, 10, np.inf],
|
|
labels=[0, 1, 2, 3]).astype(int)
|
|
|
|
# Traffic-based features
|
|
if 'traffic_volume' in df.columns:
|
|
df['traffic_volume'] = pd.to_numeric(df['traffic_volume'], errors='coerce').fillna(100.0)
|
|
q75 = df['traffic_volume'].quantile(0.75)
|
|
q25 = df['traffic_volume'].quantile(0.25)
|
|
df['high_traffic'] = (df['traffic_volume'] > q75).astype(int)
|
|
df['low_traffic'] = (df['traffic_volume'] < q25).astype(int)
|
|
|
|
traffic_std = df['traffic_volume'].std()
|
|
traffic_mean = df['traffic_volume'].mean()
|
|
|
|
if traffic_std > 0 and not pd.isna(traffic_std):
|
|
df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std
|
|
self.scalers['traffic_mean'] = float(traffic_mean)
|
|
self.scalers['traffic_std'] = float(traffic_std)
|
|
else:
|
|
df['traffic_normalized'] = 0.0
|
|
self.scalers['traffic_mean'] = 100.0
|
|
self.scalers['traffic_std'] = 50.0
|
|
|
|
df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0)
|
|
|
|
# Interaction features
|
|
if 'is_weekend' in df.columns and 'temperature' in df.columns:
|
|
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
|
|
df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0)
|
|
|
|
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
|
|
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
|
|
|
|
if 'is_holiday' in df.columns and 'temperature' in df.columns:
|
|
df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature']
|
|
|
|
if 'season' in df.columns and 'temperature' in df.columns:
|
|
df['season_temp_interaction'] = df['season'] * df['temperature']
|
|
|
|
# Day-of-week specific features
|
|
if 'day_of_week' in df.columns:
|
|
df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int)
|
|
df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int)
|
|
|
|
# Month-specific features
|
|
if 'month' in df.columns:
|
|
df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int)
|
|
df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int)
|
|
|
|
# Special day: Payday
|
|
if 'is_payday_period' in df.columns:
|
|
df['is_payday'] = df['is_payday_period']
|
|
|
|
return df
|
|
|
|
def _add_advanced_features(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add advanced features using AdvancedFeatureEngineer"""
|
|
df = df.copy()
|
|
|
|
logger.info("Adding advanced features (lagged, rolling, cyclical, trends)",
|
|
input_rows=len(df),
|
|
input_columns=len(df.columns))
|
|
|
|
self.feature_engineer = AdvancedFeatureEngineer()
|
|
|
|
df = self.feature_engineer.create_all_features(
|
|
df,
|
|
date_column='date',
|
|
include_lags=True,
|
|
include_rolling=True,
|
|
include_interactions=True,
|
|
include_cyclical=True
|
|
)
|
|
|
|
df = self.feature_engineer.fill_na_values(df, strategy='forward_backward')
|
|
|
|
created_features = self.feature_engineer.get_feature_columns()
|
|
logger.info(f"Added {len(created_features)} advanced features")
|
|
|
|
return df
|
|
|
|
def _add_poi_features(self, df: pd.DataFrame, poi_features: Dict[str, Any]) -> pd.DataFrame:
|
|
"""Add POI features (static, location-based)"""
|
|
if not poi_features:
|
|
logger.warning("No POI features to add")
|
|
return df
|
|
|
|
logger.info(f"Adding {len(poi_features)} POI features to dataframe")
|
|
|
|
for feature_name, feature_value in poi_features.items():
|
|
if isinstance(feature_value, bool):
|
|
feature_value = 1 if feature_value else 0
|
|
df[feature_name] = feature_value
|
|
|
|
return df
|
|
|
|
def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Handle missing values in future prediction data"""
|
|
numeric_columns = df.select_dtypes(include=[np.number]).columns
|
|
|
|
madrid_defaults = {
|
|
'temperature': 15.0,
|
|
'precipitation': 0.0,
|
|
'humidity': 60.0,
|
|
'wind_speed': 5.0,
|
|
'traffic_volume': 100.0,
|
|
'pedestrian_count': 50.0,
|
|
'pressure': 1013.0
|
|
}
|
|
|
|
for col in numeric_columns:
|
|
if df[col].isna().any():
|
|
default_value = 0
|
|
for key, value in madrid_defaults.items():
|
|
if key in col.lower():
|
|
default_value = value
|
|
break
|
|
|
|
df[col] = df[col].fillna(default_value)
|
|
|
|
return df
|
|
|
|
def _get_season(self, month: int) -> int:
|
|
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
|
|
if month in [12, 1, 2]:
|
|
return 1 # Winter
|
|
elif month in [3, 4, 5]:
|
|
return 2 # Spring
|
|
elif month in [6, 7, 8]:
|
|
return 3 # Summer
|
|
else:
|
|
return 4 # Autumn
|
|
|
|
def _is_spanish_holiday(self, date: datetime) -> bool:
|
|
"""Check if a date is a Spanish holiday"""
|
|
try:
|
|
if isinstance(date, datetime):
|
|
date = date.date()
|
|
elif isinstance(date, pd.Timestamp):
|
|
date = date.date()
|
|
|
|
return date in self.spain_holidays
|
|
except Exception as e:
|
|
logger.warning(f"Error checking holiday status for {date}: {e}")
|
|
month_day = (date.month, date.day)
|
|
basic_holidays = [
|
|
(1, 1), (1, 6), (5, 1), (8, 15), (10, 12),
|
|
(11, 1), (12, 6), (12, 8), (12, 25)
|
|
]
|
|
return month_day in basic_holidays
|
|
|
|
def _is_school_holiday(self, date: datetime) -> bool:
|
|
"""Check if a date is during school holidays in Spain"""
|
|
try:
|
|
from datetime import timedelta
|
|
import holidays as hol
|
|
|
|
if isinstance(date, datetime):
|
|
check_date = date.date()
|
|
elif isinstance(date, pd.Timestamp):
|
|
check_date = date.date()
|
|
else:
|
|
check_date = date
|
|
|
|
month = check_date.month
|
|
day = check_date.day
|
|
|
|
# Summer holidays (July 1 - August 31)
|
|
if month in [7, 8]:
|
|
return True
|
|
|
|
# Christmas holidays (December 23 - January 7)
|
|
if (month == 12 and day >= 23) or (month == 1 and day <= 7):
|
|
return True
|
|
|
|
# Easter/Spring break (Semana Santa)
|
|
year = check_date.year
|
|
spain_hol = hol.Spain(years=year, prov=self.region)
|
|
|
|
for holiday_date, holiday_name in spain_hol.items():
|
|
if 'viernes santo' in holiday_name.lower() or 'easter' in holiday_name.lower():
|
|
easter_start = holiday_date - timedelta(days=7)
|
|
easter_end = holiday_date + timedelta(days=7)
|
|
if easter_start <= check_date <= easter_end:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error checking school holiday for {date}: {e}")
|
|
month = date.month if hasattr(date, 'month') else date.month
|
|
day = date.day if hasattr(date, 'day') else date.day
|
|
return (month in [7, 8] or
|
|
(month == 12 and day >= 23) or
|
|
(month == 1 and day <= 7) or
|
|
(month == 4 and 1 <= day <= 15))
|