Files
bakery-ia/services/training/app/ml/data_processor.py
2025-07-19 16:59:37 +02:00

493 lines
20 KiB
Python

# services/training/app/ml/data_processor.py
"""
Data Processor for Training Service
Handles data preparation and feature engineering for ML training
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import logging
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
logger = logging.getLogger(__name__)
class BakeryDataProcessor:
"""
Enhanced data processor for bakery forecasting training service.
Handles data cleaning, feature engineering, and preparation for ML models.
"""
def __init__(self):
self.scalers = {} # Store scalers for each feature
self.imputers = {} # Store imputers for missing value handling
async def prepare_training_data(self,
sales_data: pd.DataFrame,
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame,
product_name: str) -> pd.DataFrame:
"""
Prepare comprehensive training data for a specific product.
Args:
sales_data: Historical sales data for the product
weather_data: Weather data
traffic_data: Traffic data
product_name: Product name for logging
Returns:
DataFrame ready for Prophet training with 'ds' and 'y' columns plus features
"""
try:
logger.info(f"Preparing training data for product: {product_name}")
# Convert and validate sales data
sales_clean = await self._process_sales_data(sales_data, product_name)
# Aggregate to daily level
daily_sales = await self._aggregate_daily_sales(sales_clean)
# Add temporal features
daily_sales = self._add_temporal_features(daily_sales)
# Merge external data sources
daily_sales = self._merge_weather_features(daily_sales, weather_data)
daily_sales = self._merge_traffic_features(daily_sales, traffic_data)
# Engineer additional features
daily_sales = self._engineer_features(daily_sales)
# Handle missing values
daily_sales = self._handle_missing_values(daily_sales)
# Prepare for Prophet (rename columns and validate)
prophet_data = self._prepare_prophet_format(daily_sales)
logger.info(f"Prepared {len(prophet_data)} data points for {product_name}")
return prophet_data
except Exception as e:
logger.error(f"Error preparing training data for {product_name}: {str(e)}")
raise
async def prepare_prediction_features(self,
future_dates: pd.DatetimeIndex,
weather_forecast: pd.DataFrame = None,
traffic_forecast: pd.DataFrame = None) -> pd.DataFrame:
"""
Create features for future predictions.
Args:
future_dates: Future dates to predict
weather_forecast: Weather forecast data
traffic_forecast: Traffic forecast data
Returns:
DataFrame with features for prediction
"""
try:
# Create base future dataframe
future_df = pd.DataFrame({'ds': future_dates})
# Add temporal features
future_df = self._add_temporal_features(
future_df.rename(columns={'ds': 'date'})
).rename(columns={'date': 'ds'})
# Add weather features
if weather_forecast is not None and not weather_forecast.empty:
weather_features = weather_forecast.copy()
if 'date' in weather_features.columns:
weather_features = weather_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(weather_features, on='ds', how='left')
# Add traffic features
if traffic_forecast is not None and not traffic_forecast.empty:
traffic_features = traffic_forecast.copy()
if 'date' in traffic_features.columns:
traffic_features = traffic_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(traffic_features, on='ds', how='left')
# Engineer additional features
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
future_df = future_df.rename(columns={'date': 'ds'})
# Handle missing values in future data
numeric_columns = future_df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if future_df[col].isna().any():
# Use reasonable defaults for Madrid
if col == 'temperature':
future_df[col] = future_df[col].fillna(15.0) # Default Madrid temp
elif col == 'precipitation':
future_df[col] = future_df[col].fillna(0.0) # Default no rain
elif col == 'humidity':
future_df[col] = future_df[col].fillna(60.0) # Default humidity
elif col == 'traffic_volume':
future_df[col] = future_df[col].fillna(100.0) # Default traffic
else:
future_df[col] = future_df[col].fillna(future_df[col].median())
return future_df
except Exception as e:
logger.error(f"Error creating prediction features: {e}")
# Return minimal features if error
return pd.DataFrame({'ds': future_dates})
async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame:
"""Process and clean sales data"""
sales_clean = sales_data.copy()
# Ensure date column exists and is datetime
if 'date' not in sales_clean.columns:
raise ValueError("Sales data must have a 'date' column")
sales_clean['date'] = pd.to_datetime(sales_clean['date'])
# Ensure quantity column exists and is numeric
if 'quantity' not in sales_clean.columns:
raise ValueError("Sales data must have a 'quantity' column")
sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce')
# Remove rows with invalid quantities
sales_clean = sales_clean.dropna(subset=['quantity'])
sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales
# Filter for the specific product if product_name column exists
if 'product_name' in sales_clean.columns:
sales_clean = sales_clean[sales_clean['product_name'] == product_name]
return sales_clean
async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame:
"""Aggregate sales to daily level"""
daily_sales = sales_data.groupby('date').agg({
'quantity': 'sum'
}).reset_index()
# Ensure we have data for all dates in the range
date_range = pd.date_range(
start=daily_sales['date'].min(),
end=daily_sales['date'].max(),
freq='D'
)
full_date_df = pd.DataFrame({'date': date_range})
daily_sales = full_date_df.merge(daily_sales, on='date', how='left')
daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales
return daily_sales
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add temporal features like day of week, month, etc."""
df = df.copy()
# Ensure we have a date column
if 'date' not in df.columns:
raise ValueError("DataFrame must have a 'date' column")
df['date'] = pd.to_datetime(df['date'])
# Day of week (0=Monday, 6=Sunday)
df['day_of_week'] = df['date'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Month and season
df['month'] = df['date'].dt.month
df['season'] = df['month'].apply(self._get_season)
# Week of year
df['week_of_year'] = df['date'].dt.isocalendar().week
# Quarter
df['quarter'] = df['date'].dt.quarter
# Holiday indicators (basic Spanish holidays)
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
# School calendar effects (approximate)
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
return df
def _merge_weather_features(self,
daily_sales: pd.DataFrame,
weather_data: pd.DataFrame) -> pd.DataFrame:
"""Merge weather features with sales data"""
if weather_data.empty:
# Add default weather columns with neutral values
daily_sales['temperature'] = 15.0 # Mild temperature
daily_sales['precipitation'] = 0.0 # No rain
daily_sales['humidity'] = 60.0 # Moderate humidity
daily_sales['wind_speed'] = 5.0 # Light wind
return daily_sales
try:
weather_clean = weather_data.copy()
# Ensure weather data has date column
if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns:
weather_clean = weather_clean.rename(columns={'ds': 'date'})
weather_clean['date'] = pd.to_datetime(weather_clean['date'])
# Select relevant weather features
weather_features = ['date']
# Add available weather columns with default names
weather_mapping = {
'temperature': ['temperature', 'temp', 'temperatura'],
'precipitation': ['precipitation', 'rain', 'lluvia', 'precipitacion'],
'humidity': ['humidity', 'humedad'],
'wind_speed': ['wind_speed', 'viento', 'wind']
}
for standard_name, possible_names in weather_mapping.items():
for possible_name in possible_names:
if possible_name in weather_clean.columns:
weather_clean[standard_name] = weather_clean[possible_name]
weather_features.append(standard_name)
break
# Keep only the features we found
weather_clean = weather_clean[weather_features].copy()
# Merge with sales data
merged = daily_sales.merge(weather_clean, on='date', how='left')
# Fill missing weather values with reasonable defaults
if 'temperature' in merged.columns:
merged['temperature'] = merged['temperature'].fillna(15.0)
if 'precipitation' in merged.columns:
merged['precipitation'] = merged['precipitation'].fillna(0.0)
if 'humidity' in merged.columns:
merged['humidity'] = merged['humidity'].fillna(60.0)
if 'wind_speed' in merged.columns:
merged['wind_speed'] = merged['wind_speed'].fillna(5.0)
return merged
except Exception as e:
logger.warning(f"Error merging weather data: {e}")
# Add default weather columns if merge fails
daily_sales['temperature'] = 15.0
daily_sales['precipitation'] = 0.0
daily_sales['humidity'] = 60.0
daily_sales['wind_speed'] = 5.0
return daily_sales
def _merge_traffic_features(self,
daily_sales: pd.DataFrame,
traffic_data: pd.DataFrame) -> pd.DataFrame:
"""Merge traffic features with sales data"""
if traffic_data.empty:
# Add default traffic column
daily_sales['traffic_volume'] = 100.0 # Neutral traffic level
return daily_sales
try:
traffic_clean = traffic_data.copy()
# Ensure traffic data has date column
if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns:
traffic_clean = traffic_clean.rename(columns={'ds': 'date'})
traffic_clean['date'] = pd.to_datetime(traffic_clean['date'])
# Select relevant traffic features
traffic_features = ['date']
# Map traffic column names
traffic_mapping = {
'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad'],
'pedestrian_count': ['pedestrian_count', 'peatones'],
'occupancy_rate': ['occupancy_rate', 'ocupacion']
}
for standard_name, possible_names in traffic_mapping.items():
for possible_name in possible_names:
if possible_name in traffic_clean.columns:
traffic_clean[standard_name] = traffic_clean[possible_name]
traffic_features.append(standard_name)
break
# Keep only the features we found
traffic_clean = traffic_clean[traffic_features].copy()
# Merge with sales data
merged = daily_sales.merge(traffic_clean, on='date', how='left')
# Fill missing traffic values
if 'traffic_volume' in merged.columns:
merged['traffic_volume'] = merged['traffic_volume'].fillna(100.0)
if 'pedestrian_count' in merged.columns:
merged['pedestrian_count'] = merged['pedestrian_count'].fillna(50.0)
if 'occupancy_rate' in merged.columns:
merged['occupancy_rate'] = merged['occupancy_rate'].fillna(0.5)
return merged
except Exception as e:
logger.warning(f"Error merging traffic data: {e}")
# Add default traffic column if merge fails
daily_sales['traffic_volume'] = 100.0
return daily_sales
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Engineer additional features from existing data"""
df = df.copy()
# Weather-based features
if 'temperature' in df.columns:
df['temp_squared'] = df['temperature'] ** 2
df['is_hot_day'] = (df['temperature'] > 25).astype(int)
df['is_cold_day'] = (df['temperature'] < 10).astype(int)
if 'precipitation' in df.columns:
df['is_rainy_day'] = (df['precipitation'] > 0).astype(int)
df['heavy_rain'] = (df['precipitation'] > 10).astype(int)
# Traffic-based features
if 'traffic_volume' in df.columns:
df['high_traffic'] = (df['traffic_volume'] > df['traffic_volume'].quantile(0.75)).astype(int)
df['low_traffic'] = (df['traffic_volume'] < df['traffic_volume'].quantile(0.25)).astype(int)
# Interaction features
if 'is_weekend' in df.columns and 'temperature' in df.columns:
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
return df
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset"""
df = df.copy()
# For numeric columns, use median imputation
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if col != 'quantity' and df[col].isna().any():
median_value = df[col].median()
df[col] = df[col].fillna(median_value)
return df
def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare data in Prophet format with 'ds' and 'y' columns"""
prophet_df = df.copy()
# Rename columns for Prophet
if 'date' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'date': 'ds'})
if 'quantity' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'quantity': 'y'})
# Ensure ds is datetime
if 'ds' in prophet_df.columns:
prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
# Validate required columns
if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns:
raise ValueError("Prophet data must have 'ds' and 'y' columns")
# Remove any rows with missing target values
prophet_df = prophet_df.dropna(subset=['y'])
# Sort by date
prophet_df = prophet_df.sort_values('ds').reset_index(drop=True)
return prophet_df
def _get_season(self, month: int) -> int:
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
if month in [12, 1, 2]:
return 1 # Winter
elif month in [3, 4, 5]:
return 2 # Spring
elif month in [6, 7, 8]:
return 3 # Summer
else:
return 4 # Autumn
def _is_spanish_holiday(self, date: datetime) -> bool:
"""Check if a date is a major Spanish holiday"""
month_day = (date.month, date.day)
# Major Spanish holidays that affect bakery sales
spanish_holidays = [
(1, 1), # New Year
(1, 6), # Epiphany
(5, 1), # Labour Day
(8, 15), # Assumption
(10, 12), # National Day
(11, 1), # All Saints
(12, 6), # Constitution
(12, 8), # Immaculate Conception
(12, 25), # Christmas
(5, 15), # San Isidro (Madrid)
(5, 2), # Madrid Community Day
]
return month_day in spanish_holidays
def _is_school_holiday(self, date: datetime) -> bool:
"""Check if a date is during school holidays (approximate)"""
month = date.month
# Approximate Spanish school holiday periods
# Summer holidays (July-August)
if month in [7, 8]:
return True
# Christmas holidays (mid December to early January)
if month == 12 and date.day >= 20:
return True
if month == 1 and date.day <= 10:
return True
# Easter holidays (approximate - first two weeks of April)
if month == 4 and date.day <= 14:
return True
return False
def calculate_feature_importance(self,
model_data: pd.DataFrame,
target_column: str = 'y') -> Dict[str, float]:
"""
Calculate feature importance for the model.
"""
try:
# Simple correlation-based importance
numeric_features = model_data.select_dtypes(include=[np.number]).columns
numeric_features = [col for col in numeric_features if col != target_column]
importance_scores = {}
for feature in numeric_features:
if feature in model_data.columns:
correlation = model_data[feature].corr(model_data[target_column])
importance_scores[feature] = abs(correlation) if not pd.isna(correlation) else 0.0
# Sort by importance
importance_scores = dict(sorted(importance_scores.items(),
key=lambda x: x[1], reverse=True))
return importance_scores
except Exception as e:
logger.error(f"Error calculating feature importance: {e}")
return {}