872 lines
38 KiB
Python
872 lines
38 KiB
Python
"""
|
|
Enhanced Data Processor for Training Service with Repository Pattern
|
|
Uses repository pattern for data access and dependency injection
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from datetime import datetime, timedelta, timezone
|
|
import structlog
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.impute import SimpleImputer
|
|
|
|
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType
|
|
from app.repositories import ModelRepository, TrainingLogRepository
|
|
from shared.database.base import create_database_manager
|
|
from shared.database.transactions import transactional
|
|
from shared.database.exceptions import DatabaseError
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
class EnhancedBakeryDataProcessor:
|
|
"""
|
|
Enhanced data processor for bakery forecasting with repository pattern.
|
|
Integrates date alignment, data cleaning, feature engineering, and preparation for ML models.
|
|
"""
|
|
|
|
def __init__(self, database_manager=None):
|
|
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service")
|
|
self.scalers = {} # Store scalers for each feature
|
|
self.imputers = {} # Store imputers for missing value handling
|
|
self.date_alignment_service = DateAlignmentService()
|
|
|
|
def get_scalers(self) -> Dict[str, Any]:
|
|
"""Return the scalers/normalization parameters for use during prediction"""
|
|
return self.scalers.copy()
|
|
|
|
async def _get_repositories(self, session):
|
|
"""Initialize repositories with session"""
|
|
return {
|
|
'model': ModelRepository(session),
|
|
'training_log': TrainingLogRepository(session)
|
|
}
|
|
|
|
def _ensure_timezone_aware(self, df: pd.DataFrame, date_column: str = 'date') -> pd.DataFrame:
|
|
"""Ensure date column is timezone-aware to prevent conversion errors"""
|
|
if date_column in df.columns:
|
|
# Convert to datetime if not already
|
|
df[date_column] = pd.to_datetime(df[date_column])
|
|
|
|
# If timezone-naive, localize to UTC
|
|
if df[date_column].dt.tz is None:
|
|
df[date_column] = df[date_column].dt.tz_localize('UTC')
|
|
# If already timezone-aware but not UTC, convert to UTC
|
|
elif str(df[date_column].dt.tz) != 'UTC':
|
|
df[date_column] = df[date_column].dt.tz_convert('UTC')
|
|
|
|
return df
|
|
|
|
async def prepare_training_data(self,
|
|
sales_data: pd.DataFrame,
|
|
weather_data: pd.DataFrame,
|
|
traffic_data: pd.DataFrame,
|
|
product_name: str,
|
|
tenant_id: str = None,
|
|
job_id: str = None,
|
|
session=None) -> pd.DataFrame:
|
|
"""
|
|
Prepare comprehensive training data for a specific product with repository logging.
|
|
|
|
Args:
|
|
sales_data: Historical sales data for the product
|
|
weather_data: Weather data
|
|
traffic_data: Traffic data
|
|
product_name: Product name for logging
|
|
tenant_id: Optional tenant ID for tracking
|
|
job_id: Optional job ID for tracking
|
|
|
|
Returns:
|
|
DataFrame ready for Prophet training with 'ds' and 'y' columns plus features
|
|
"""
|
|
try:
|
|
logger.info("Preparing enhanced training data using repository pattern",
|
|
product_name=product_name,
|
|
tenant_id=tenant_id,
|
|
job_id=job_id)
|
|
|
|
# Get database session and repositories
|
|
async with self.database_manager.get_session() as db_session:
|
|
repos = await self._get_repositories(db_session)
|
|
|
|
# Log data preparation start if we have tracking info
|
|
if job_id and tenant_id:
|
|
await repos['training_log'].update_log_progress(
|
|
job_id, 15, f"preparing_data_{product_name}", "running"
|
|
)
|
|
|
|
# Step 1: Convert and validate sales data
|
|
sales_clean = await self._process_sales_data(sales_data, product_name)
|
|
|
|
# FIX: Ensure timezone awareness before any operations
|
|
sales_clean = self._ensure_timezone_aware(sales_clean)
|
|
weather_data = self._ensure_timezone_aware(weather_data) if not weather_data.empty else weather_data
|
|
traffic_data = self._ensure_timezone_aware(traffic_data) if not traffic_data.empty else traffic_data
|
|
|
|
# Step 2: Apply date alignment if we have date constraints
|
|
sales_clean = await self._apply_date_alignment(sales_clean, weather_data, traffic_data)
|
|
|
|
# Step 3: Aggregate to daily level
|
|
daily_sales = await self._aggregate_daily_sales(sales_clean)
|
|
|
|
# Step 4: Add temporal features
|
|
daily_sales = self._add_temporal_features(daily_sales)
|
|
|
|
# Step 5: Merge external data sources
|
|
daily_sales = self._merge_weather_features(daily_sales, weather_data)
|
|
daily_sales = self._merge_traffic_features(daily_sales, traffic_data)
|
|
|
|
# Step 6: Engineer additional features
|
|
daily_sales = self._engineer_features(daily_sales)
|
|
|
|
# Step 7: Handle missing values
|
|
daily_sales = self._handle_missing_values(daily_sales)
|
|
|
|
# Step 8: Prepare for Prophet (rename columns and validate)
|
|
prophet_data = self._prepare_prophet_format(daily_sales)
|
|
|
|
# Step 9: Store processing metadata if we have a tenant
|
|
if tenant_id:
|
|
await self._store_processing_metadata(
|
|
repos, tenant_id, product_name, prophet_data, job_id
|
|
)
|
|
|
|
logger.info("Enhanced training data prepared successfully",
|
|
product_name=product_name,
|
|
data_points=len(prophet_data))
|
|
|
|
return prophet_data
|
|
|
|
except Exception as e:
|
|
logger.error("Error preparing enhanced training data",
|
|
product_name=product_name,
|
|
error=str(e))
|
|
raise
|
|
|
|
async def _store_processing_metadata(self,
|
|
repos: Dict,
|
|
tenant_id: str,
|
|
product_name: str,
|
|
processed_data: pd.DataFrame,
|
|
job_id: str = None):
|
|
"""Store data processing metadata using repository"""
|
|
try:
|
|
# Create processing metadata
|
|
metadata = {
|
|
"product_name": product_name,
|
|
"data_points": len(processed_data),
|
|
"date_range": {
|
|
"start": processed_data['ds'].min().isoformat(),
|
|
"end": processed_data['ds'].max().isoformat()
|
|
},
|
|
"features_count": len([col for col in processed_data.columns if col not in ['ds', 'y']]),
|
|
"processed_at": datetime.now().isoformat()
|
|
}
|
|
|
|
# Log processing completion
|
|
if job_id:
|
|
await repos['training_log'].update_log_progress(
|
|
job_id, 25, f"data_prepared_{product_name}", "running"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to store processing metadata",
|
|
error=str(e))
|
|
|
|
async def prepare_prediction_features(self,
|
|
future_dates: pd.DatetimeIndex,
|
|
weather_forecast: pd.DataFrame = None,
|
|
traffic_forecast: pd.DataFrame = None) -> pd.DataFrame:
|
|
"""
|
|
Create features for future predictions with proper date handling.
|
|
|
|
Args:
|
|
future_dates: Future dates to predict
|
|
weather_forecast: Weather forecast data
|
|
traffic_forecast: Traffic forecast data
|
|
|
|
Returns:
|
|
DataFrame with features for prediction
|
|
"""
|
|
try:
|
|
# Create base future dataframe
|
|
future_df = pd.DataFrame({'ds': future_dates})
|
|
|
|
# Add temporal features
|
|
future_df = self._add_temporal_features(
|
|
future_df.rename(columns={'ds': 'date'})
|
|
).rename(columns={'date': 'ds'})
|
|
|
|
# Add weather features
|
|
if weather_forecast is not None and not weather_forecast.empty:
|
|
weather_features = weather_forecast.copy()
|
|
if 'date' in weather_features.columns:
|
|
weather_features = weather_features.rename(columns={'date': 'ds'})
|
|
|
|
future_df = future_df.merge(weather_features, on='ds', how='left')
|
|
|
|
# Add traffic features
|
|
if traffic_forecast is not None and not traffic_forecast.empty:
|
|
traffic_features = traffic_forecast.copy()
|
|
if 'date' in traffic_features.columns:
|
|
traffic_features = traffic_features.rename(columns={'date': 'ds'})
|
|
|
|
future_df = future_df.merge(traffic_features, on='ds', how='left')
|
|
|
|
# Engineer additional features
|
|
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
|
|
future_df = future_df.rename(columns={'date': 'ds'})
|
|
|
|
# Handle missing values in future data
|
|
future_df = self._handle_missing_values_future(future_df)
|
|
|
|
return future_df
|
|
|
|
except Exception as e:
|
|
logger.error("Error creating prediction features", error=str(e))
|
|
# Return minimal features if error
|
|
return pd.DataFrame({'ds': future_dates})
|
|
|
|
async def _apply_date_alignment(self,
|
|
sales_data: pd.DataFrame,
|
|
weather_data: pd.DataFrame,
|
|
traffic_data: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Apply date alignment constraints to ensure data consistency across sources.
|
|
"""
|
|
try:
|
|
if sales_data.empty:
|
|
return sales_data
|
|
|
|
# Create date range from sales data
|
|
sales_dates = pd.to_datetime(sales_data['date'])
|
|
sales_date_range = DateRange(
|
|
start=sales_dates.min(),
|
|
end=sales_dates.max(),
|
|
source=DataSourceType.BAKERY_SALES
|
|
)
|
|
|
|
# Get aligned date range considering all constraints
|
|
aligned_range = self.date_alignment_service.validate_and_align_dates(
|
|
user_sales_range=sales_date_range
|
|
)
|
|
|
|
# Filter sales data to aligned range
|
|
mask = (sales_dates >= aligned_range.start) & (sales_dates <= aligned_range.end)
|
|
filtered_sales = sales_data[mask].copy()
|
|
|
|
logger.info("Date alignment completed",
|
|
original_records=len(sales_data),
|
|
filtered_records=len(filtered_sales),
|
|
date_range=f"{aligned_range.start.date()} to {aligned_range.end.date()}")
|
|
|
|
if aligned_range.constraints:
|
|
logger.info("Applied constraints", constraints=aligned_range.constraints)
|
|
|
|
return filtered_sales
|
|
|
|
except Exception as e:
|
|
logger.warning("Date alignment failed, using original data", error=str(e))
|
|
return sales_data
|
|
|
|
async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame:
|
|
"""Process and clean sales data with enhanced validation"""
|
|
sales_clean = sales_data.copy()
|
|
|
|
# Ensure date column exists and is datetime
|
|
if 'date' not in sales_clean.columns:
|
|
raise ValueError("Sales data must have a 'date' column")
|
|
|
|
sales_clean['date'] = pd.to_datetime(sales_clean['date'])
|
|
|
|
# Handle different quantity column names
|
|
quantity_columns = ['quantity', 'quantity_sold', 'sales', 'units_sold']
|
|
quantity_col = None
|
|
|
|
for col in quantity_columns:
|
|
if col in sales_clean.columns:
|
|
quantity_col = col
|
|
break
|
|
|
|
if quantity_col is None:
|
|
raise ValueError(f"Sales data must have one of these columns: {quantity_columns}")
|
|
|
|
# Standardize to 'quantity'
|
|
if quantity_col != 'quantity':
|
|
sales_clean['quantity'] = sales_clean[quantity_col]
|
|
logger.info("Mapped quantity column",
|
|
from_column=quantity_col,
|
|
to_column='quantity')
|
|
|
|
sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce')
|
|
|
|
# Remove rows with invalid quantities
|
|
sales_clean = sales_clean.dropna(subset=['quantity'])
|
|
sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales
|
|
|
|
# Filter for the specific product if product_name column exists
|
|
if 'product_name' in sales_clean.columns:
|
|
sales_clean = sales_clean[sales_clean['product_name'] == product_name]
|
|
|
|
# Remove duplicate dates (keep the one with highest quantity)
|
|
sales_clean = sales_clean.sort_values(['date', 'quantity'], ascending=[True, False])
|
|
sales_clean = sales_clean.drop_duplicates(subset=['date'], keep='first')
|
|
|
|
return sales_clean
|
|
|
|
async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Aggregate sales to daily level with improved date handling"""
|
|
if sales_data.empty:
|
|
return pd.DataFrame(columns=['date', 'quantity'])
|
|
|
|
# Group by date and sum quantities
|
|
daily_sales = sales_data.groupby('date').agg({
|
|
'quantity': 'sum'
|
|
}).reset_index()
|
|
|
|
# Ensure we have data for all dates in the range (fill gaps with 0)
|
|
date_range = pd.date_range(
|
|
start=daily_sales['date'].min(),
|
|
end=daily_sales['date'].max(),
|
|
freq='D'
|
|
)
|
|
|
|
full_date_df = pd.DataFrame({'date': date_range})
|
|
daily_sales = full_date_df.merge(daily_sales, on='date', how='left')
|
|
daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales
|
|
|
|
return daily_sales
|
|
|
|
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add comprehensive temporal features for bakery demand patterns"""
|
|
df = df.copy()
|
|
|
|
# Ensure we have a date column
|
|
if 'date' not in df.columns:
|
|
raise ValueError("DataFrame must have a 'date' column")
|
|
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
|
|
# Basic temporal features
|
|
df['day_of_week'] = df['date'].dt.dayofweek # 0=Monday, 6=Sunday
|
|
df['day_of_month'] = df['date'].dt.day
|
|
df['month'] = df['date'].dt.month
|
|
df['quarter'] = df['date'].dt.quarter
|
|
df['week_of_year'] = df['date'].dt.isocalendar().week
|
|
|
|
# Bakery-specific features
|
|
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
|
|
df['is_monday'] = (df['day_of_week'] == 0).astype(int) # Monday often has different patterns
|
|
df['is_friday'] = (df['day_of_week'] == 4).astype(int) # Friday often busy
|
|
|
|
# Season mapping for Madrid
|
|
df['season'] = df['month'].apply(self._get_season)
|
|
df['is_summer'] = (df['season'] == 3).astype(int) # Summer seasonality
|
|
df['is_winter'] = (df['season'] == 1).astype(int) # Winter seasonality
|
|
|
|
# Holiday and special day indicators
|
|
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
|
|
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
|
|
df['is_month_start'] = (df['day_of_month'] <= 3).astype(int)
|
|
df['is_month_end'] = (df['day_of_month'] >= 28).astype(int)
|
|
|
|
# Payday patterns (common in Spain: end/beginning of month)
|
|
df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int)
|
|
|
|
return df
|
|
|
|
def _merge_weather_features(self,
|
|
daily_sales: pd.DataFrame,
|
|
weather_data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Merge weather features with enhanced Madrid-specific handling"""
|
|
|
|
# Define weather_defaults OUTSIDE try block to fix scope error
|
|
weather_defaults = {
|
|
'temperature': 15.0,
|
|
'precipitation': 0.0,
|
|
'humidity': 60.0,
|
|
'wind_speed': 5.0,
|
|
'pressure': 1013.0
|
|
}
|
|
|
|
if weather_data.empty:
|
|
# Add default weather columns
|
|
for feature, default_value in weather_defaults.items():
|
|
daily_sales[feature] = default_value
|
|
return daily_sales
|
|
|
|
try:
|
|
weather_clean = weather_data.copy()
|
|
|
|
# Standardize date column
|
|
if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns:
|
|
weather_clean = weather_clean.rename(columns={'ds': 'date'})
|
|
|
|
# CRITICAL FIX: Ensure both DataFrames have compatible datetime formats
|
|
weather_clean['date'] = pd.to_datetime(weather_clean['date'])
|
|
daily_sales['date'] = pd.to_datetime(daily_sales['date'])
|
|
|
|
# NEW FIX: Normalize both to timezone-naive datetime for merge compatibility
|
|
if weather_clean['date'].dt.tz is not None:
|
|
weather_clean['date'] = weather_clean['date'].dt.tz_convert('UTC').dt.tz_localize(None)
|
|
|
|
if daily_sales['date'].dt.tz is not None:
|
|
daily_sales['date'] = daily_sales['date'].dt.tz_convert('UTC').dt.tz_localize(None)
|
|
|
|
# Map weather columns to standard names
|
|
weather_mapping = {
|
|
'temperature': ['temperature', 'temp', 'temperatura'],
|
|
'precipitation': ['precipitation', 'precip', 'rain', 'lluvia'],
|
|
'humidity': ['humidity', 'humedad', 'relative_humidity'],
|
|
'wind_speed': ['wind_speed', 'viento', 'wind'],
|
|
'pressure': ['pressure', 'presion', 'atmospheric_pressure']
|
|
}
|
|
|
|
weather_features = ['date']
|
|
|
|
for standard_name, possible_names in weather_mapping.items():
|
|
for possible_name in possible_names:
|
|
if possible_name in weather_clean.columns:
|
|
weather_clean[standard_name] = pd.to_numeric(weather_clean[possible_name], errors='coerce')
|
|
weather_features.append(standard_name)
|
|
break
|
|
|
|
# Keep only the features we found
|
|
weather_clean = weather_clean[weather_features].copy()
|
|
|
|
# Merge with sales data
|
|
merged = daily_sales.merge(weather_clean, on='date', how='left')
|
|
|
|
# Fill missing weather values with Madrid-appropriate defaults
|
|
for feature, default_value in weather_defaults.items():
|
|
if feature in merged.columns:
|
|
merged[feature] = merged[feature].fillna(default_value)
|
|
|
|
return merged
|
|
|
|
except Exception as e:
|
|
logger.warning("Error merging weather data", error=str(e))
|
|
# Add default weather columns if merge fails
|
|
for feature, default_value in weather_defaults.items():
|
|
daily_sales[feature] = default_value
|
|
return daily_sales
|
|
|
|
|
|
def _merge_traffic_features(self,
|
|
daily_sales: pd.DataFrame,
|
|
traffic_data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Merge traffic features with enhanced Madrid-specific handling"""
|
|
|
|
if traffic_data.empty:
|
|
# Add default traffic column
|
|
daily_sales['traffic_volume'] = 100.0 # Neutral traffic level
|
|
return daily_sales
|
|
|
|
try:
|
|
traffic_clean = traffic_data.copy()
|
|
|
|
# Standardize date column
|
|
if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns:
|
|
traffic_clean = traffic_clean.rename(columns={'ds': 'date'})
|
|
|
|
# CRITICAL FIX: Ensure both DataFrames have compatible datetime formats
|
|
traffic_clean['date'] = pd.to_datetime(traffic_clean['date'])
|
|
daily_sales['date'] = pd.to_datetime(daily_sales['date'])
|
|
|
|
# NEW FIX: Normalize both to timezone-naive datetime for merge compatibility
|
|
if traffic_clean['date'].dt.tz is not None:
|
|
traffic_clean['date'] = traffic_clean['date'].dt.tz_convert('UTC').dt.tz_localize(None)
|
|
|
|
if daily_sales['date'].dt.tz is not None:
|
|
daily_sales['date'] = daily_sales['date'].dt.tz_convert('UTC').dt.tz_localize(None)
|
|
|
|
# Map traffic columns to standard names
|
|
traffic_mapping = {
|
|
'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad', 'volume'],
|
|
'pedestrian_count': ['pedestrian_count', 'peatones', 'pedestrians'],
|
|
'congestion_level': ['congestion_level', 'congestion', 'nivel_congestion'],
|
|
'average_speed': ['average_speed', 'speed', 'velocidad_media', 'avg_speed']
|
|
}
|
|
|
|
traffic_features = ['date']
|
|
|
|
for standard_name, possible_names in traffic_mapping.items():
|
|
for possible_name in possible_names:
|
|
if possible_name in traffic_clean.columns:
|
|
traffic_clean[standard_name] = pd.to_numeric(traffic_clean[possible_name], errors='coerce')
|
|
traffic_features.append(standard_name)
|
|
break
|
|
|
|
# Keep only the features we found
|
|
traffic_clean = traffic_clean[traffic_features].copy()
|
|
|
|
# Merge with sales data
|
|
merged = daily_sales.merge(traffic_clean, on='date', how='left')
|
|
|
|
# Fill missing traffic values with reasonable defaults
|
|
traffic_defaults = {
|
|
'traffic_volume': 100.0,
|
|
'pedestrian_count': 50.0,
|
|
'congestion_level': 1.0, # Low congestion
|
|
'average_speed': 30.0 # km/h typical for Madrid
|
|
}
|
|
|
|
for feature, default_value in traffic_defaults.items():
|
|
if feature in merged.columns:
|
|
merged[feature] = merged[feature].fillna(default_value)
|
|
|
|
return merged
|
|
|
|
except Exception as e:
|
|
logger.warning("Error merging traffic data", error=str(e))
|
|
# Add default traffic column if merge fails
|
|
daily_sales['traffic_volume'] = 100.0
|
|
return daily_sales
|
|
|
|
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Engineer additional features from existing data with bakery-specific insights"""
|
|
df = df.copy()
|
|
|
|
# Weather-based features
|
|
if 'temperature' in df.columns:
|
|
df['temp_squared'] = df['temperature'] ** 2
|
|
df['is_hot_day'] = (df['temperature'] > 25).astype(int) # Hot days in Madrid
|
|
df['is_cold_day'] = (df['temperature'] < 10).astype(int) # Cold days
|
|
df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int)
|
|
|
|
# Temperature categories for bakery products
|
|
df['temp_category'] = pd.cut(df['temperature'],
|
|
bins=[-np.inf, 5, 15, 25, np.inf],
|
|
labels=[0, 1, 2, 3]).astype(int)
|
|
|
|
if 'precipitation' in df.columns:
|
|
df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int)
|
|
df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int)
|
|
df['rain_intensity'] = pd.cut(df['precipitation'],
|
|
bins=[-0.1, 0, 2, 10, np.inf],
|
|
labels=[0, 1, 2, 3]).astype(int)
|
|
|
|
# Traffic-based features with NaN protection
|
|
if 'traffic_volume' in df.columns:
|
|
# Calculate traffic quantiles for relative measures
|
|
q75 = df['traffic_volume'].quantile(0.75)
|
|
q25 = df['traffic_volume'].quantile(0.25)
|
|
|
|
df['high_traffic'] = (df['traffic_volume'] > q75).astype(int)
|
|
df['low_traffic'] = (df['traffic_volume'] < q25).astype(int)
|
|
|
|
# Safe normalization with NaN protection
|
|
traffic_std = df['traffic_volume'].std()
|
|
traffic_mean = df['traffic_volume'].mean()
|
|
|
|
if traffic_std > 0 and not pd.isna(traffic_std) and not pd.isna(traffic_mean):
|
|
df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std
|
|
|
|
# Store normalization parameters for later use in predictions
|
|
self.scalers['traffic_mean'] = float(traffic_mean)
|
|
self.scalers['traffic_std'] = float(traffic_std)
|
|
|
|
logger.info(f"Traffic normalization parameters: mean={traffic_mean:.2f}, std={traffic_std:.2f}")
|
|
else:
|
|
logger.warning("Traffic volume has zero standard deviation, using zeros for normalized values")
|
|
df['traffic_normalized'] = 0.0
|
|
|
|
# Store default parameters for consistency
|
|
self.scalers['traffic_mean'] = 100.0 # Default traffic level used during training
|
|
self.scalers['traffic_std'] = 50.0 # Reasonable std for traffic normalization
|
|
|
|
# Fill any remaining NaN values
|
|
df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0)
|
|
|
|
# Interaction features - bakery specific
|
|
if 'is_weekend' in df.columns and 'temperature' in df.columns:
|
|
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
|
|
df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0)
|
|
|
|
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
|
|
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
|
|
|
|
if 'is_holiday' in df.columns and 'temperature' in df.columns:
|
|
df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature']
|
|
|
|
# Seasonal interactions
|
|
if 'season' in df.columns and 'temperature' in df.columns:
|
|
df['season_temp_interaction'] = df['season'] * df['temperature']
|
|
|
|
# Day-of-week specific features
|
|
if 'day_of_week' in df.columns:
|
|
# Working days vs weekends
|
|
df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int)
|
|
|
|
# Peak bakery days (Friday, Saturday, Sunday often busy)
|
|
df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int)
|
|
|
|
# Month-specific features for bakery seasonality
|
|
if 'month' in df.columns:
|
|
# High-demand months (holidays, summer)
|
|
df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int)
|
|
|
|
# Spring/summer months
|
|
df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int)
|
|
|
|
# FINAL SAFETY CHECK: Remove any remaining NaN values
|
|
numeric_columns = df.select_dtypes(include=[np.number]).columns
|
|
for col in numeric_columns:
|
|
if df[col].isna().any():
|
|
nan_count = df[col].isna().sum()
|
|
logger.warning("Found NaN values in column, filling with 0",
|
|
column=col,
|
|
nan_count=nan_count)
|
|
df[col] = df[col].fillna(0.0)
|
|
|
|
return df
|
|
|
|
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Handle missing values in the dataset with improved strategies"""
|
|
df = df.copy()
|
|
|
|
# For numeric columns, use appropriate imputation strategies
|
|
numeric_columns = df.select_dtypes(include=[np.number]).columns
|
|
|
|
for col in numeric_columns:
|
|
if col != 'quantity' and df[col].isna().any():
|
|
# Use different strategies based on column type
|
|
if 'temperature' in col:
|
|
df[col] = df[col].fillna(15.0) # Madrid average
|
|
elif 'precipitation' in col or 'rain' in col:
|
|
df[col] = df[col].fillna(0.0) # Default no rain
|
|
elif 'humidity' in col:
|
|
df[col] = df[col].fillna(60.0) # Moderate humidity
|
|
elif 'traffic' in col:
|
|
df[col] = df[col].fillna(df[col].median()) # Use median for traffic
|
|
elif 'wind' in col:
|
|
df[col] = df[col].fillna(5.0) # Light wind
|
|
elif 'pressure' in col:
|
|
df[col] = df[col].fillna(1013.0) # Standard atmospheric pressure
|
|
else:
|
|
# For other columns, use median or forward fill
|
|
if df[col].count() > 0:
|
|
df[col] = df[col].fillna(df[col].median())
|
|
else:
|
|
df[col] = df[col].fillna(0)
|
|
|
|
return df
|
|
|
|
def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Handle missing values in future prediction data"""
|
|
numeric_columns = df.select_dtypes(include=[np.number]).columns
|
|
|
|
madrid_defaults = {
|
|
'temperature': 15.0,
|
|
'precipitation': 0.0,
|
|
'humidity': 60.0,
|
|
'wind_speed': 5.0,
|
|
'traffic_volume': 100.0,
|
|
'pedestrian_count': 50.0,
|
|
'pressure': 1013.0
|
|
}
|
|
|
|
for col in numeric_columns:
|
|
if df[col].isna().any():
|
|
# Find appropriate default value
|
|
default_value = 0
|
|
for key, value in madrid_defaults.items():
|
|
if key in col.lower():
|
|
default_value = value
|
|
break
|
|
|
|
df[col] = df[col].fillna(default_value)
|
|
|
|
return df
|
|
|
|
def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare data in Prophet format with enhanced validation"""
|
|
prophet_df = df.copy()
|
|
|
|
# Rename columns for Prophet
|
|
if 'date' in prophet_df.columns:
|
|
prophet_df = prophet_df.rename(columns={'date': 'ds'})
|
|
|
|
if 'quantity' in prophet_df.columns:
|
|
prophet_df = prophet_df.rename(columns={'quantity': 'y'})
|
|
|
|
# Ensure ds is datetime and remove timezone info
|
|
if 'ds' in prophet_df.columns:
|
|
prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
|
|
if prophet_df['ds'].dt.tz is not None:
|
|
prophet_df['ds'] = prophet_df['ds'].dt.tz_localize(None)
|
|
|
|
# Validate required columns
|
|
if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns:
|
|
raise ValueError("Prophet data must have 'ds' and 'y' columns")
|
|
|
|
# Clean target values
|
|
prophet_df = prophet_df.dropna(subset=['y'])
|
|
prophet_df['y'] = prophet_df['y'].clip(lower=0) # No negative sales
|
|
|
|
# Remove any duplicate dates (keep last occurrence)
|
|
prophet_df = prophet_df.drop_duplicates(subset=['ds'], keep='last')
|
|
|
|
# Sort by date
|
|
prophet_df = prophet_df.sort_values('ds').reset_index(drop=True)
|
|
|
|
# Final validation
|
|
if len(prophet_df) == 0:
|
|
raise ValueError("No valid data points after cleaning")
|
|
|
|
logger.info("Prophet data prepared",
|
|
rows=len(prophet_df),
|
|
date_range=f"{prophet_df['ds'].min()} to {prophet_df['ds'].max()}")
|
|
|
|
return prophet_df
|
|
|
|
def _get_season(self, month: int) -> int:
|
|
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
|
|
if month in [12, 1, 2]:
|
|
return 1 # Winter
|
|
elif month in [3, 4, 5]:
|
|
return 2 # Spring
|
|
elif month in [6, 7, 8]:
|
|
return 3 # Summer
|
|
else:
|
|
return 4 # Autumn
|
|
|
|
def _is_spanish_holiday(self, date: datetime) -> bool:
|
|
"""Check if a date is a major Spanish holiday"""
|
|
month_day = (date.month, date.day)
|
|
|
|
# Major Spanish holidays that affect bakery sales
|
|
spanish_holidays = [
|
|
(1, 1), # New Year
|
|
(1, 6), # Epiphany (Reyes)
|
|
(5, 1), # Labour Day
|
|
(8, 15), # Assumption
|
|
(10, 12), # National Day
|
|
(11, 1), # All Saints
|
|
(12, 6), # Constitution
|
|
(12, 8), # Immaculate Conception
|
|
(12, 25), # Christmas
|
|
(5, 15), # San Isidro (Madrid patron saint)
|
|
(5, 2), # Madrid Community Day
|
|
]
|
|
|
|
return month_day in spanish_holidays
|
|
|
|
def _is_school_holiday(self, date: datetime) -> bool:
|
|
"""Check if a date is during school holidays (approximate)"""
|
|
month = date.month
|
|
|
|
# Approximate Spanish school holiday periods
|
|
# Summer holidays (July-August)
|
|
if month in [7, 8]:
|
|
return True
|
|
|
|
# Christmas holidays (mid December to early January)
|
|
if month == 12 and date.day >= 20:
|
|
return True
|
|
if month == 1 and date.day <= 10:
|
|
return True
|
|
|
|
# Easter holidays (approximate - early April)
|
|
if month == 4 and date.day <= 15:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def calculate_feature_importance(self,
|
|
model_data: pd.DataFrame,
|
|
target_column: str = 'y') -> Dict[str, float]:
|
|
"""
|
|
Calculate feature importance for the model using correlation analysis with repository logging.
|
|
"""
|
|
try:
|
|
# Get numeric features
|
|
numeric_features = model_data.select_dtypes(include=[np.number]).columns
|
|
numeric_features = [col for col in numeric_features if col != target_column]
|
|
|
|
importance_scores = {}
|
|
|
|
if target_column not in model_data.columns:
|
|
logger.warning("Target column not found", target_column=target_column)
|
|
return {}
|
|
|
|
for feature in numeric_features:
|
|
if feature in model_data.columns:
|
|
correlation = model_data[feature].corr(model_data[target_column])
|
|
if not pd.isna(correlation) and not np.isinf(correlation):
|
|
importance_scores[feature] = abs(correlation)
|
|
|
|
# Sort by importance
|
|
importance_scores = dict(sorted(importance_scores.items(),
|
|
key=lambda x: x[1], reverse=True))
|
|
|
|
logger.info("Calculated feature importance",
|
|
features_count=len(importance_scores))
|
|
|
|
return importance_scores
|
|
|
|
except Exception as e:
|
|
logger.error("Error calculating feature importance", error=str(e))
|
|
return {}
|
|
|
|
async def get_data_quality_report(self, df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""
|
|
Generate a comprehensive data quality report with repository integration.
|
|
"""
|
|
try:
|
|
report = {
|
|
"total_records": len(df),
|
|
"date_range": {
|
|
"start": df['ds'].min().isoformat() if 'ds' in df.columns else None,
|
|
"end": df['ds'].max().isoformat() if 'ds' in df.columns else None,
|
|
"duration_days": (df['ds'].max() - df['ds'].min()).days if 'ds' in df.columns else 0
|
|
},
|
|
"missing_values": {},
|
|
"data_completeness": 0.0,
|
|
"target_statistics": {},
|
|
"feature_count": 0
|
|
}
|
|
|
|
# Calculate missing values
|
|
missing_counts = df.isnull().sum()
|
|
total_cells = len(df)
|
|
|
|
for col in df.columns:
|
|
missing_count = missing_counts[col]
|
|
report["missing_values"][col] = {
|
|
"count": int(missing_count),
|
|
"percentage": round((missing_count / total_cells) * 100, 2)
|
|
}
|
|
|
|
# Overall completeness
|
|
total_missing = missing_counts.sum()
|
|
total_possible = len(df) * len(df.columns)
|
|
report["data_completeness"] = round(((total_possible - total_missing) / total_possible) * 100, 2)
|
|
|
|
# Target variable statistics
|
|
if 'y' in df.columns:
|
|
y_col = df['y']
|
|
report["target_statistics"] = {
|
|
"mean": round(y_col.mean(), 2),
|
|
"median": round(y_col.median(), 2),
|
|
"std": round(y_col.std(), 2),
|
|
"min": round(y_col.min(), 2),
|
|
"max": round(y_col.max(), 2),
|
|
"zero_count": int((y_col == 0).sum()),
|
|
"zero_percentage": round(((y_col == 0).sum() / len(y_col)) * 100, 2)
|
|
}
|
|
|
|
# Feature count
|
|
numeric_features = df.select_dtypes(include=[np.number]).columns
|
|
report["feature_count"] = len([col for col in numeric_features if col not in ['y', 'ds']])
|
|
|
|
return report
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating data quality report", error=str(e))
|
|
return {"error": str(e)}
|
|
|
|
|
|
# Legacy compatibility alias
|
|
BakeryDataProcessor = EnhancedBakeryDataProcessor |