# services/training/app/ml/data_processor.py """ Data Processor for Training Service Handles data preparation and feature engineering for ML training """ import pandas as pd import numpy as np from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta import logging from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer logger = logging.getLogger(__name__) class BakeryDataProcessor: """ Enhanced data processor for bakery forecasting training service. Handles data cleaning, feature engineering, and preparation for ML models. """ def __init__(self): self.scalers = {} # Store scalers for each feature self.imputers = {} # Store imputers for missing value handling async def prepare_training_data(self, sales_data: pd.DataFrame, weather_data: pd.DataFrame, traffic_data: pd.DataFrame, product_name: str) -> pd.DataFrame: """ Prepare comprehensive training data for a specific product. Args: sales_data: Historical sales data for the product weather_data: Weather data traffic_data: Traffic data product_name: Product name for logging Returns: DataFrame ready for Prophet training with 'ds' and 'y' columns plus features """ try: logger.info(f"Preparing training data for product: {product_name}") # Convert and validate sales data sales_clean = await self._process_sales_data(sales_data, product_name) # Aggregate to daily level daily_sales = await self._aggregate_daily_sales(sales_clean) # Add temporal features daily_sales = self._add_temporal_features(daily_sales) # Merge external data sources daily_sales = self._merge_weather_features(daily_sales, weather_data) daily_sales = self._merge_traffic_features(daily_sales, traffic_data) # Engineer additional features daily_sales = self._engineer_features(daily_sales) # Handle missing values daily_sales = self._handle_missing_values(daily_sales) # Prepare for Prophet (rename columns and validate) prophet_data = self._prepare_prophet_format(daily_sales) logger.info(f"Prepared {len(prophet_data)} data points for {product_name}") return prophet_data except Exception as e: logger.error(f"Error preparing training data for {product_name}: {str(e)}") raise async def prepare_prediction_features(self, future_dates: pd.DatetimeIndex, weather_forecast: pd.DataFrame = None, traffic_forecast: pd.DataFrame = None) -> pd.DataFrame: """ Create features for future predictions. Args: future_dates: Future dates to predict weather_forecast: Weather forecast data traffic_forecast: Traffic forecast data Returns: DataFrame with features for prediction """ try: # Create base future dataframe future_df = pd.DataFrame({'ds': future_dates}) # Add temporal features future_df = self._add_temporal_features( future_df.rename(columns={'ds': 'date'}) ).rename(columns={'date': 'ds'}) # Add weather features if weather_forecast is not None and not weather_forecast.empty: weather_features = weather_forecast.copy() if 'date' in weather_features.columns: weather_features = weather_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(weather_features, on='ds', how='left') # Add traffic features if traffic_forecast is not None and not traffic_forecast.empty: traffic_features = traffic_forecast.copy() if 'date' in traffic_features.columns: traffic_features = traffic_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(traffic_features, on='ds', how='left') # Engineer additional features future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'})) future_df = future_df.rename(columns={'date': 'ds'}) # Handle missing values in future data numeric_columns = future_df.select_dtypes(include=[np.number]).columns for col in numeric_columns: if future_df[col].isna().any(): # Use reasonable defaults for Madrid if col == 'temperature': future_df[col] = future_df[col].fillna(15.0) # Default Madrid temp elif col == 'precipitation': future_df[col] = future_df[col].fillna(0.0) # Default no rain elif col == 'humidity': future_df[col] = future_df[col].fillna(60.0) # Default humidity elif col == 'traffic_volume': future_df[col] = future_df[col].fillna(100.0) # Default traffic else: future_df[col] = future_df[col].fillna(future_df[col].median()) return future_df except Exception as e: logger.error(f"Error creating prediction features: {e}") # Return minimal features if error return pd.DataFrame({'ds': future_dates}) async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame: """Process and clean sales data""" sales_clean = sales_data.copy() # Ensure date column exists and is datetime if 'date' not in sales_clean.columns: raise ValueError("Sales data must have a 'date' column") sales_clean['date'] = pd.to_datetime(sales_clean['date']) # Ensure quantity column exists and is numeric if 'quantity' not in sales_clean.columns: raise ValueError("Sales data must have a 'quantity' column") sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce') # Remove rows with invalid quantities sales_clean = sales_clean.dropna(subset=['quantity']) sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales # Filter for the specific product if product_name column exists if 'product_name' in sales_clean.columns: sales_clean = sales_clean[sales_clean['product_name'] == product_name] return sales_clean async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame: """Aggregate sales to daily level""" daily_sales = sales_data.groupby('date').agg({ 'quantity': 'sum' }).reset_index() # Ensure we have data for all dates in the range date_range = pd.date_range( start=daily_sales['date'].min(), end=daily_sales['date'].max(), freq='D' ) full_date_df = pd.DataFrame({'date': date_range}) daily_sales = full_date_df.merge(daily_sales, on='date', how='left') daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales return daily_sales def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame: """Add temporal features like day of week, month, etc.""" df = df.copy() # Ensure we have a date column if 'date' not in df.columns: raise ValueError("DataFrame must have a 'date' column") df['date'] = pd.to_datetime(df['date']) # Day of week (0=Monday, 6=Sunday) df['day_of_week'] = df['date'].dt.dayofweek df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) # Month and season df['month'] = df['date'].dt.month df['season'] = df['month'].apply(self._get_season) # Week of year df['week_of_year'] = df['date'].dt.isocalendar().week # Quarter df['quarter'] = df['date'].dt.quarter # Holiday indicators (basic Spanish holidays) df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int) # School calendar effects (approximate) df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int) return df def _merge_weather_features(self, daily_sales: pd.DataFrame, weather_data: pd.DataFrame) -> pd.DataFrame: """Merge weather features with sales data""" if weather_data.empty: # Add default weather columns with neutral values daily_sales['temperature'] = 15.0 # Mild temperature daily_sales['precipitation'] = 0.0 # No rain daily_sales['humidity'] = 60.0 # Moderate humidity daily_sales['wind_speed'] = 5.0 # Light wind return daily_sales try: weather_clean = weather_data.copy() # Ensure weather data has date column if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns: weather_clean = weather_clean.rename(columns={'ds': 'date'}) weather_clean['date'] = pd.to_datetime(weather_clean['date']) # Select relevant weather features weather_features = ['date'] # Add available weather columns with default names weather_mapping = { 'temperature': ['temperature', 'temp', 'temperatura'], 'precipitation': ['precipitation', 'rain', 'lluvia', 'precipitacion'], 'humidity': ['humidity', 'humedad'], 'wind_speed': ['wind_speed', 'viento', 'wind'] } for standard_name, possible_names in weather_mapping.items(): for possible_name in possible_names: if possible_name in weather_clean.columns: weather_clean[standard_name] = weather_clean[possible_name] weather_features.append(standard_name) break # Keep only the features we found weather_clean = weather_clean[weather_features].copy() # Merge with sales data merged = daily_sales.merge(weather_clean, on='date', how='left') # Fill missing weather values with reasonable defaults if 'temperature' in merged.columns: merged['temperature'] = merged['temperature'].fillna(15.0) if 'precipitation' in merged.columns: merged['precipitation'] = merged['precipitation'].fillna(0.0) if 'humidity' in merged.columns: merged['humidity'] = merged['humidity'].fillna(60.0) if 'wind_speed' in merged.columns: merged['wind_speed'] = merged['wind_speed'].fillna(5.0) return merged except Exception as e: logger.warning(f"Error merging weather data: {e}") # Add default weather columns if merge fails daily_sales['temperature'] = 15.0 daily_sales['precipitation'] = 0.0 daily_sales['humidity'] = 60.0 daily_sales['wind_speed'] = 5.0 return daily_sales def _merge_traffic_features(self, daily_sales: pd.DataFrame, traffic_data: pd.DataFrame) -> pd.DataFrame: """Merge traffic features with sales data""" if traffic_data.empty: # Add default traffic column daily_sales['traffic_volume'] = 100.0 # Neutral traffic level return daily_sales try: traffic_clean = traffic_data.copy() # Ensure traffic data has date column if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns: traffic_clean = traffic_clean.rename(columns={'ds': 'date'}) traffic_clean['date'] = pd.to_datetime(traffic_clean['date']) # Select relevant traffic features traffic_features = ['date'] # Map traffic column names traffic_mapping = { 'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad'], 'pedestrian_count': ['pedestrian_count', 'peatones'], 'occupancy_rate': ['occupancy_rate', 'ocupacion'] } for standard_name, possible_names in traffic_mapping.items(): for possible_name in possible_names: if possible_name in traffic_clean.columns: traffic_clean[standard_name] = traffic_clean[possible_name] traffic_features.append(standard_name) break # Keep only the features we found traffic_clean = traffic_clean[traffic_features].copy() # Merge with sales data merged = daily_sales.merge(traffic_clean, on='date', how='left') # Fill missing traffic values if 'traffic_volume' in merged.columns: merged['traffic_volume'] = merged['traffic_volume'].fillna(100.0) if 'pedestrian_count' in merged.columns: merged['pedestrian_count'] = merged['pedestrian_count'].fillna(50.0) if 'occupancy_rate' in merged.columns: merged['occupancy_rate'] = merged['occupancy_rate'].fillna(0.5) return merged except Exception as e: logger.warning(f"Error merging traffic data: {e}") # Add default traffic column if merge fails daily_sales['traffic_volume'] = 100.0 return daily_sales def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame: """Engineer additional features from existing data""" df = df.copy() # Weather-based features if 'temperature' in df.columns: df['temp_squared'] = df['temperature'] ** 2 df['is_hot_day'] = (df['temperature'] > 25).astype(int) df['is_cold_day'] = (df['temperature'] < 10).astype(int) if 'precipitation' in df.columns: df['is_rainy_day'] = (df['precipitation'] > 0).astype(int) df['heavy_rain'] = (df['precipitation'] > 10).astype(int) # Traffic-based features if 'traffic_volume' in df.columns: df['high_traffic'] = (df['traffic_volume'] > df['traffic_volume'].quantile(0.75)).astype(int) df['low_traffic'] = (df['traffic_volume'] < df['traffic_volume'].quantile(0.25)).astype(int) # Interaction features if 'is_weekend' in df.columns and 'temperature' in df.columns: df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature'] if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns: df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume'] return df def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: """Handle missing values in the dataset""" df = df.copy() # For numeric columns, use median imputation numeric_columns = df.select_dtypes(include=[np.number]).columns for col in numeric_columns: if col != 'quantity' and df[col].isna().any(): median_value = df[col].median() df[col] = df[col].fillna(median_value) return df def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame: """Prepare data in Prophet format with 'ds' and 'y' columns""" prophet_df = df.copy() # Rename columns for Prophet if 'date' in prophet_df.columns: prophet_df = prophet_df.rename(columns={'date': 'ds'}) if 'quantity' in prophet_df.columns: prophet_df = prophet_df.rename(columns={'quantity': 'y'}) # Ensure ds is datetime if 'ds' in prophet_df.columns: prophet_df['ds'] = pd.to_datetime(prophet_df['ds']) # Validate required columns if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns: raise ValueError("Prophet data must have 'ds' and 'y' columns") # Remove any rows with missing target values prophet_df = prophet_df.dropna(subset=['y']) # Sort by date prophet_df = prophet_df.sort_values('ds').reset_index(drop=True) return prophet_df def _get_season(self, month: int) -> int: """Get season from month (1-4 for Winter, Spring, Summer, Autumn)""" if month in [12, 1, 2]: return 1 # Winter elif month in [3, 4, 5]: return 2 # Spring elif month in [6, 7, 8]: return 3 # Summer else: return 4 # Autumn def _is_spanish_holiday(self, date: datetime) -> bool: """Check if a date is a major Spanish holiday""" month_day = (date.month, date.day) # Major Spanish holidays that affect bakery sales spanish_holidays = [ (1, 1), # New Year (1, 6), # Epiphany (5, 1), # Labour Day (8, 15), # Assumption (10, 12), # National Day (11, 1), # All Saints (12, 6), # Constitution (12, 8), # Immaculate Conception (12, 25), # Christmas (5, 15), # San Isidro (Madrid) (5, 2), # Madrid Community Day ] return month_day in spanish_holidays def _is_school_holiday(self, date: datetime) -> bool: """Check if a date is during school holidays (approximate)""" month = date.month # Approximate Spanish school holiday periods # Summer holidays (July-August) if month in [7, 8]: return True # Christmas holidays (mid December to early January) if month == 12 and date.day >= 20: return True if month == 1 and date.day <= 10: return True # Easter holidays (approximate - first two weeks of April) if month == 4 and date.day <= 14: return True return False def calculate_feature_importance(self, model_data: pd.DataFrame, target_column: str = 'y') -> Dict[str, float]: """ Calculate feature importance for the model. """ try: # Simple correlation-based importance numeric_features = model_data.select_dtypes(include=[np.number]).columns numeric_features = [col for col in numeric_features if col != target_column] importance_scores = {} for feature in numeric_features: if feature in model_data.columns: correlation = model_data[feature].corr(model_data[target_column]) importance_scores[feature] = abs(correlation) if not pd.isna(correlation) else 0.0 # Sort by importance importance_scores = dict(sorted(importance_scores.items(), key=lambda x: x[1], reverse=True)) return importance_scores except Exception as e: logger.error(f"Error calculating feature importance: {e}") return {}