Files
bakery-ia/shared/ml/data_processor.py
2025-12-13 23:57:54 +01:00

401 lines
16 KiB
Python
Executable File

"""
Shared Data Processor for Bakery Forecasting
Provides feature engineering capabilities for both training and prediction
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
from datetime import datetime
import structlog
import holidays
from shared.ml.enhanced_features import AdvancedFeatureEngineer
logger = structlog.get_logger()
class EnhancedBakeryDataProcessor:
"""
Shared data processor for bakery forecasting.
Focuses on prediction feature preparation without training-specific dependencies.
"""
def __init__(self, region: str = 'MD'):
"""
Initialize the data processor.
Args:
region: Spanish region code for holidays (MD=Madrid, PV=Basque, etc.)
"""
self.scalers = {}
self.feature_engineer = AdvancedFeatureEngineer()
self.region = region
self.spain_holidays = holidays.Spain(prov=region)
def get_scalers(self) -> Dict[str, Any]:
"""Return the scalers/normalization parameters for use during prediction"""
return self.scalers.copy()
@staticmethod
def _extract_numeric_from_dict(value: Any) -> Optional[float]:
"""
Robust extraction of numeric values from complex data structures.
"""
if isinstance(value, (int, float)) and not isinstance(value, bool):
return float(value)
if isinstance(value, dict):
for key in ['value', 'data', 'result', 'amount', 'count', 'number', 'val']:
if key in value:
extracted = value[key]
if isinstance(extracted, dict):
return EnhancedBakeryDataProcessor._extract_numeric_from_dict(extracted)
elif isinstance(extracted, (int, float)) and not isinstance(extracted, bool):
return float(extracted)
for v in value.values():
if isinstance(v, (int, float)) and not isinstance(v, bool):
return float(v)
elif isinstance(v, dict):
result = EnhancedBakeryDataProcessor._extract_numeric_from_dict(v)
if result is not None:
return result
if isinstance(value, str):
try:
return float(value)
except (ValueError, TypeError):
pass
return None
async def prepare_prediction_features(self,
future_dates: pd.DatetimeIndex,
weather_forecast: pd.DataFrame = None,
traffic_forecast: pd.DataFrame = None,
poi_features: Dict[str, Any] = None,
historical_data: pd.DataFrame = None) -> pd.DataFrame:
"""
Create features for future predictions.
Args:
future_dates: Future dates to predict
weather_forecast: Weather forecast data
traffic_forecast: Traffic forecast data (optional, not commonly forecasted)
poi_features: POI features (location-based, static)
historical_data: Historical data for creating lagged and rolling features
Returns:
DataFrame with features for prediction
"""
try:
# Create base future dataframe
future_df = pd.DataFrame({'ds': future_dates})
# Add temporal features
future_df = self._add_temporal_features(
future_df.rename(columns={'ds': 'date'})
).rename(columns={'date': 'ds'})
# Add weather features
if weather_forecast is not None and not weather_forecast.empty:
weather_features = weather_forecast.copy()
if 'date' in weather_features.columns:
weather_features = weather_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(weather_features, on='ds', how='left')
# Add traffic features
if traffic_forecast is not None and not traffic_forecast.empty:
traffic_features = traffic_forecast.copy()
if 'date' in traffic_features.columns:
traffic_features = traffic_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(traffic_features, on='ds', how='left')
# Engineer basic features
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
# Add advanced features if historical data is provided
if historical_data is not None and not historical_data.empty:
combined_df = pd.concat([
historical_data.rename(columns={'ds': 'date'}),
future_df
], ignore_index=True).sort_values('date')
combined_df = self._add_advanced_features(combined_df)
future_df = combined_df[combined_df['date'].isin(future_df['date'])].copy()
else:
logger.warning("No historical data provided, lagged features will be NaN")
future_df = self._add_advanced_features(future_df)
# Add POI features (static, location-based)
if poi_features:
future_df = self._add_poi_features(future_df, poi_features)
future_df = future_df.rename(columns={'date': 'ds'})
# Handle missing values
future_df = self._handle_missing_values_future(future_df)
return future_df
except Exception as e:
logger.error("Error creating prediction features", error=str(e))
return pd.DataFrame({'ds': future_dates})
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add comprehensive temporal features"""
df = df.copy()
if 'date' not in df.columns:
raise ValueError("DataFrame must have a 'date' column")
df['date'] = pd.to_datetime(df['date'])
# Basic temporal features
df['day_of_week'] = df['date'].dt.dayofweek
df['day_of_month'] = df['date'].dt.day
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['week_of_year'] = df['date'].dt.isocalendar().week
# Bakery-specific features
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_monday'] = (df['day_of_week'] == 0).astype(int)
df['is_friday'] = (df['day_of_week'] == 4).astype(int)
# Season mapping
df['season'] = df['month'].apply(self._get_season)
df['is_summer'] = (df['season'] == 3).astype(int)
df['is_winter'] = (df['season'] == 1).astype(int)
# Holiday indicators
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
df['is_month_start'] = (df['day_of_month'] <= 3).astype(int)
df['is_month_end'] = (df['day_of_month'] >= 28).astype(int)
# Payday patterns
df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int)
return df
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Engineer additional features"""
df = df.copy()
# Weather-based features
if 'temperature' in df.columns:
df['temperature'] = pd.to_numeric(df['temperature'], errors='coerce').fillna(15.0)
df['temp_squared'] = df['temperature'] ** 2
df['is_hot_day'] = (df['temperature'] > 25).astype(int)
df['is_cold_day'] = (df['temperature'] < 10).astype(int)
df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int)
df['temp_category'] = pd.cut(df['temperature'],
bins=[-np.inf, 5, 15, 25, np.inf],
labels=[0, 1, 2, 3]).astype(int)
if 'precipitation' in df.columns:
df['precipitation'] = pd.to_numeric(df['precipitation'], errors='coerce').fillna(0.0)
df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int)
df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int)
df['rain_intensity'] = pd.cut(df['precipitation'],
bins=[-0.1, 0, 2, 10, np.inf],
labels=[0, 1, 2, 3]).astype(int)
# Traffic-based features
if 'traffic_volume' in df.columns:
df['traffic_volume'] = pd.to_numeric(df['traffic_volume'], errors='coerce').fillna(100.0)
q75 = df['traffic_volume'].quantile(0.75)
q25 = df['traffic_volume'].quantile(0.25)
df['high_traffic'] = (df['traffic_volume'] > q75).astype(int)
df['low_traffic'] = (df['traffic_volume'] < q25).astype(int)
traffic_std = df['traffic_volume'].std()
traffic_mean = df['traffic_volume'].mean()
if traffic_std > 0 and not pd.isna(traffic_std):
df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std
self.scalers['traffic_mean'] = float(traffic_mean)
self.scalers['traffic_std'] = float(traffic_std)
else:
df['traffic_normalized'] = 0.0
self.scalers['traffic_mean'] = 100.0
self.scalers['traffic_std'] = 50.0
df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0)
# Interaction features
if 'is_weekend' in df.columns and 'temperature' in df.columns:
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0)
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
if 'is_holiday' in df.columns and 'temperature' in df.columns:
df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature']
if 'season' in df.columns and 'temperature' in df.columns:
df['season_temp_interaction'] = df['season'] * df['temperature']
# Day-of-week specific features
if 'day_of_week' in df.columns:
df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int)
df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int)
# Month-specific features
if 'month' in df.columns:
df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int)
df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int)
# Special day: Payday
if 'is_payday_period' in df.columns:
df['is_payday'] = df['is_payday_period']
return df
def _add_advanced_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add advanced features using AdvancedFeatureEngineer"""
df = df.copy()
logger.info("Adding advanced features (lagged, rolling, cyclical, trends)",
input_rows=len(df),
input_columns=len(df.columns))
self.feature_engineer = AdvancedFeatureEngineer()
df = self.feature_engineer.create_all_features(
df,
date_column='date',
include_lags=True,
include_rolling=True,
include_interactions=True,
include_cyclical=True
)
df = self.feature_engineer.fill_na_values(df, strategy='forward_backward')
created_features = self.feature_engineer.get_feature_columns()
logger.info(f"Added {len(created_features)} advanced features")
return df
def _add_poi_features(self, df: pd.DataFrame, poi_features: Dict[str, Any]) -> pd.DataFrame:
"""Add POI features (static, location-based)"""
if not poi_features:
logger.warning("No POI features to add")
return df
logger.info(f"Adding {len(poi_features)} POI features to dataframe")
for feature_name, feature_value in poi_features.items():
if isinstance(feature_value, bool):
feature_value = 1 if feature_value else 0
df[feature_name] = feature_value
return df
def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in future prediction data"""
numeric_columns = df.select_dtypes(include=[np.number]).columns
madrid_defaults = {
'temperature': 15.0,
'precipitation': 0.0,
'humidity': 60.0,
'wind_speed': 5.0,
'traffic_volume': 100.0,
'pedestrian_count': 50.0,
'pressure': 1013.0
}
for col in numeric_columns:
if df[col].isna().any():
default_value = 0
for key, value in madrid_defaults.items():
if key in col.lower():
default_value = value
break
df[col] = df[col].fillna(default_value)
return df
def _get_season(self, month: int) -> int:
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
if month in [12, 1, 2]:
return 1 # Winter
elif month in [3, 4, 5]:
return 2 # Spring
elif month in [6, 7, 8]:
return 3 # Summer
else:
return 4 # Autumn
def _is_spanish_holiday(self, date: datetime) -> bool:
"""Check if a date is a Spanish holiday"""
try:
if isinstance(date, datetime):
date = date.date()
elif isinstance(date, pd.Timestamp):
date = date.date()
return date in self.spain_holidays
except Exception as e:
logger.warning(f"Error checking holiday status for {date}: {e}")
month_day = (date.month, date.day)
basic_holidays = [
(1, 1), (1, 6), (5, 1), (8, 15), (10, 12),
(11, 1), (12, 6), (12, 8), (12, 25)
]
return month_day in basic_holidays
def _is_school_holiday(self, date: datetime) -> bool:
"""Check if a date is during school holidays in Spain"""
try:
from datetime import timedelta
import holidays as hol
if isinstance(date, datetime):
check_date = date.date()
elif isinstance(date, pd.Timestamp):
check_date = date.date()
else:
check_date = date
month = check_date.month
day = check_date.day
# Summer holidays (July 1 - August 31)
if month in [7, 8]:
return True
# Christmas holidays (December 23 - January 7)
if (month == 12 and day >= 23) or (month == 1 and day <= 7):
return True
# Easter/Spring break (Semana Santa)
year = check_date.year
spain_hol = hol.Spain(years=year, prov=self.region)
for holiday_date, holiday_name in spain_hol.items():
if 'viernes santo' in holiday_name.lower() or 'easter' in holiday_name.lower():
easter_start = holiday_date - timedelta(days=7)
easter_end = holiday_date + timedelta(days=7)
if easter_start <= check_date <= easter_end:
return True
return False
except Exception as e:
logger.warning(f"Error checking school holiday for {date}: {e}")
month = date.month if hasattr(date, 'month') else date.month
day = date.day if hasattr(date, 'day') else date.day
return (month in [7, 8] or
(month == 12 and day >= 23) or
(month == 1 and day <= 7) or
(month == 4 and 1 <= day <= 15))