Files
bakery-ia/services/training/app/ml/data_processor.py

1452 lines
70 KiB
Python
Raw Normal View History

2025-07-19 16:59:37 +02:00
"""
2025-08-08 09:08:41 +02:00
Enhanced Data Processor for Training Service with Repository Pattern
Uses repository pattern for data access and dependency injection
2025-07-19 16:59:37 +02:00
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
2025-07-28 20:20:54 +02:00
from datetime import datetime, timedelta, timezone
2025-08-08 09:08:41 +02:00
import structlog
2025-07-19 16:59:37 +02:00
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
2025-07-28 19:28:39 +02:00
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType
2025-08-08 09:08:41 +02:00
from app.repositories import ModelRepository, TrainingLogRepository
from shared.database.base import create_database_manager
from shared.database.transactions import transactional
from shared.database.exceptions import DatabaseError
from app.core.config import settings
2025-11-14 07:23:56 +01:00
from shared.ml.enhanced_features import AdvancedFeatureEngineer
2025-11-05 13:34:56 +01:00
import holidays
2025-07-28 19:28:39 +02:00
2025-08-08 09:08:41 +02:00
logger = structlog.get_logger()
2025-07-19 16:59:37 +02:00
2025-08-08 09:08:41 +02:00
class EnhancedBakeryDataProcessor:
2025-07-19 16:59:37 +02:00
"""
2025-08-08 09:08:41 +02:00
Enhanced data processor for bakery forecasting with repository pattern.
2025-07-28 19:28:39 +02:00
Integrates date alignment, data cleaning, feature engineering, and preparation for ML models.
2025-07-19 16:59:37 +02:00
"""
2025-11-05 13:34:56 +01:00
def __init__(self, database_manager=None, region: str = 'MD'):
2025-08-08 09:08:41 +02:00
self.database_manager = database_manager or create_database_manager(settings.DATABASE_URL, "training-service")
2025-07-19 16:59:37 +02:00
self.scalers = {} # Store scalers for each feature
self.imputers = {} # Store imputers for missing value handling
2025-07-28 19:28:39 +02:00
self.date_alignment_service = DateAlignmentService()
2025-11-05 13:34:56 +01:00
self.feature_engineer = AdvancedFeatureEngineer()
self.region = region # Region for holidays (MD=Madrid, PV=Basque, etc.)
self.spain_holidays = holidays.Spain(prov=region) # Initialize holidays library
2025-08-12 18:17:30 +02:00
def get_scalers(self) -> Dict[str, Any]:
"""Return the scalers/normalization parameters for use during prediction"""
return self.scalers.copy()
2025-11-05 13:34:56 +01:00
@staticmethod
def _extract_numeric_from_dict(value: Any) -> Optional[float]:
"""
Robust extraction of numeric values from complex data structures.
Handles various dict structures that might come from external APIs.
Args:
value: Any value that might be a dict, numeric, or other type
Returns:
Numeric value as float, or None if extraction fails
"""
# If already numeric, return it
if isinstance(value, (int, float)) and not isinstance(value, bool):
return float(value)
# If it's a dict, try multiple extraction strategies
if isinstance(value, dict):
# Strategy 1: Try common keys
for key in ['value', 'data', 'result', 'amount', 'count', 'number', 'val']:
if key in value:
extracted = value[key]
# Recursively extract if nested
if isinstance(extracted, dict):
return EnhancedBakeryDataProcessor._extract_numeric_from_dict(extracted)
elif isinstance(extracted, (int, float)) and not isinstance(extracted, bool):
return float(extracted)
# Strategy 2: Try to find first numeric value in dict
for v in value.values():
if isinstance(v, (int, float)) and not isinstance(v, bool):
return float(v)
elif isinstance(v, dict):
# Recursively try nested dicts
result = EnhancedBakeryDataProcessor._extract_numeric_from_dict(v)
if result is not None:
return result
# Strategy 3: Try to convert string to numeric
if isinstance(value, str):
try:
return float(value)
except (ValueError, TypeError):
pass
# If all strategies fail, return None (will be converted to NaN)
return None
2025-08-08 09:08:41 +02:00
async def _get_repositories(self, session):
"""Initialize repositories with session"""
return {
'model': ModelRepository(session),
'training_log': TrainingLogRepository(session)
}
2025-07-30 08:13:32 +02:00
def _ensure_timezone_aware(self, df: pd.DataFrame, date_column: str = 'date') -> pd.DataFrame:
"""Ensure date column is timezone-aware to prevent conversion errors"""
if date_column in df.columns:
# Convert to datetime if not already
df[date_column] = pd.to_datetime(df[date_column])
# If timezone-naive, localize to UTC
if df[date_column].dt.tz is None:
df[date_column] = df[date_column].dt.tz_localize('UTC')
# If already timezone-aware but not UTC, convert to UTC
elif str(df[date_column].dt.tz) != 'UTC':
df[date_column] = df[date_column].dt.tz_convert('UTC')
return df
2025-07-19 16:59:37 +02:00
async def prepare_training_data(self,
sales_data: pd.DataFrame,
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame,
2025-08-14 16:47:34 +02:00
inventory_product_id: str,
poi_features: Dict[str, Any] = None,
2025-08-08 09:08:41 +02:00
tenant_id: str = None,
job_id: str = None,
session=None) -> pd.DataFrame:
2025-07-19 16:59:37 +02:00
"""
2025-08-08 09:08:41 +02:00
Prepare comprehensive training data for a specific product with repository logging.
2025-07-19 16:59:37 +02:00
Args:
sales_data: Historical sales data for the product
weather_data: Weather data
traffic_data: Traffic data
2025-08-14 16:47:34 +02:00
inventory_product_id: Inventory product UUID for logging
poi_features: POI features (location-based, static)
2025-08-08 09:08:41 +02:00
tenant_id: Optional tenant ID for tracking
job_id: Optional job ID for tracking
2025-07-19 16:59:37 +02:00
Returns:
DataFrame ready for Prophet training with 'ds' and 'y' columns plus features
"""
try:
2025-08-08 09:08:41 +02:00
logger.info("Preparing enhanced training data using repository pattern",
2025-08-14 16:47:34 +02:00
inventory_product_id=inventory_product_id,
2025-08-08 09:08:41 +02:00
tenant_id=tenant_id,
job_id=job_id)
2025-11-05 18:47:20 +01:00
# Use provided session if available, otherwise create one
if session is None:
logger.debug("Creating new session for data preparation",
inventory_product_id=inventory_product_id)
async with self.database_manager.get_session() as db_session:
repos = await self._get_repositories(db_session)
# Log data preparation start if we have tracking info
if job_id and tenant_id:
logger.debug("About to update training log progress",
inventory_product_id=inventory_product_id,
job_id=job_id)
await repos['training_log'].update_log_progress(
job_id, 15, f"preparing_data_{inventory_product_id}", "running"
)
logger.debug("Updated training log progress",
inventory_product_id=inventory_product_id,
job_id=job_id)
# Commit the created session
await db_session.commit()
logger.debug("Committed session after data preparation progress update",
inventory_product_id=inventory_product_id)
else:
logger.debug("Using provided session for data preparation",
inventory_product_id=inventory_product_id)
# Use the provided session
repos = await self._get_repositories(session)
2025-08-08 09:08:41 +02:00
# Log data preparation start if we have tracking info
if job_id and tenant_id:
2025-11-05 18:47:20 +01:00
logger.debug("About to update training log progress with provided session",
inventory_product_id=inventory_product_id,
job_id=job_id)
2025-08-08 09:08:41 +02:00
await repos['training_log'].update_log_progress(
2025-08-14 16:47:34 +02:00
job_id, 15, f"preparing_data_{inventory_product_id}", "running"
2025-08-08 09:08:41 +02:00
)
2025-11-05 18:47:20 +01:00
logger.debug("Updated training log progress with provided session",
inventory_product_id=inventory_product_id,
job_id=job_id)
# Don't commit the provided session as the caller manages it
logger.debug("Updated progress with provided session",
inventory_product_id=inventory_product_id)
2025-11-05 18:47:20 +01:00
logger.debug("Starting Step 1: Convert and validate sales data",
inventory_product_id=inventory_product_id)
# Step 1: Convert and validate sales data
sales_clean = await self._process_sales_data(sales_data, inventory_product_id)
logger.debug("Step 1 completed: Convert and validate sales data",
inventory_product_id=inventory_product_id,
sales_records=len(sales_clean))
logger.debug("Starting Step 2: Ensure timezone awareness",
inventory_product_id=inventory_product_id)
# FIX: Ensure timezone awareness before any operations
sales_clean = self._ensure_timezone_aware(sales_clean)
weather_data = self._ensure_timezone_aware(weather_data) if not weather_data.empty else weather_data
traffic_data = self._ensure_timezone_aware(traffic_data) if not traffic_data.empty else traffic_data
logger.debug("Step 2 completed: Ensure timezone awareness",
inventory_product_id=inventory_product_id,
weather_records=len(weather_data) if not weather_data.empty else 0,
traffic_records=len(traffic_data) if not traffic_data.empty else 0)
logger.debug("Starting Step 3: Apply date alignment",
inventory_product_id=inventory_product_id)
# Step 2: Apply date alignment if we have date constraints
sales_clean = await self._apply_date_alignment(sales_clean, weather_data, traffic_data)
logger.debug("Step 3 completed: Apply date alignment",
inventory_product_id=inventory_product_id,
sales_records=len(sales_clean))
logger.debug("Starting Step 4: Aggregate to daily level",
inventory_product_id=inventory_product_id)
# Step 3: Aggregate to daily level
daily_sales = await self._aggregate_daily_sales(sales_clean)
logger.debug("Step 4 completed: Aggregate to daily level",
inventory_product_id=inventory_product_id,
daily_records=len(daily_sales))
logger.debug("Starting Step 5: Add temporal features",
inventory_product_id=inventory_product_id)
# Step 4: Add temporal features
daily_sales = self._add_temporal_features(daily_sales)
logger.debug("Step 5 completed: Add temporal features",
inventory_product_id=inventory_product_id,
features_added=True)
logger.debug("Starting Step 6: Merge external data sources",
inventory_product_id=inventory_product_id)
# Step 5: Merge external data sources
daily_sales = self._merge_weather_features(daily_sales, weather_data)
daily_sales = self._merge_traffic_features(daily_sales, traffic_data)
logger.debug("Step 6 completed: Merge external data sources",
inventory_product_id=inventory_product_id,
merged_successfully=True)
2025-11-05 13:34:56 +01:00
2025-11-05 18:47:20 +01:00
logger.debug("Starting Step 7: Engineer basic features",
inventory_product_id=inventory_product_id)
# Step 6: Engineer basic features
daily_sales = self._engineer_features(daily_sales)
logger.debug("Step 7 completed: Engineer basic features",
inventory_product_id=inventory_product_id,
feature_columns=len([col for col in daily_sales.columns if col not in ['date', 'quantity']]))
logger.debug("Starting Step 8: Add advanced features",
inventory_product_id=inventory_product_id)
# Step 6b: Add advanced features (lagged, rolling, cyclical, interactions, trends)
daily_sales = self._add_advanced_features(daily_sales)
logger.debug("Step 8 completed: Add advanced features",
inventory_product_id=inventory_product_id,
total_features=len(daily_sales.columns))
2025-11-05 13:34:56 +01:00
logger.debug("Starting Step 8b: Add POI features",
inventory_product_id=inventory_product_id)
# Step 8b: Add POI features (static, location-based)
if poi_features:
daily_sales = self._add_poi_features(daily_sales, poi_features)
logger.debug("Step 8b completed: Add POI features",
inventory_product_id=inventory_product_id,
poi_feature_count=len(poi_features))
else:
logger.debug("Step 8b skipped: No POI features available",
inventory_product_id=inventory_product_id)
2025-11-05 18:47:20 +01:00
logger.debug("Starting Step 9: Handle missing values",
inventory_product_id=inventory_product_id)
# Step 7: Handle missing values
daily_sales = self._handle_missing_values(daily_sales)
logger.debug("Step 9 completed: Handle missing values",
inventory_product_id=inventory_product_id,
missing_values_handled=True)
logger.debug("Starting Step 10: Prepare for Prophet format",
inventory_product_id=inventory_product_id)
# Step 8: Prepare for Prophet (rename columns and validate)
prophet_data = self._prepare_prophet_format(daily_sales)
logger.debug("Step 10 completed: Prepare for Prophet format",
inventory_product_id=inventory_product_id,
prophet_records=len(prophet_data))
logger.debug("Starting Step 11: Store processing metadata",
inventory_product_id=inventory_product_id)
# Step 9: Store processing metadata if we have a tenant
if tenant_id:
await self._store_processing_metadata(
repos, tenant_id, inventory_product_id, prophet_data, job_id, session
)
logger.debug("Step 11 completed: Store processing metadata",
inventory_product_id=inventory_product_id)
logger.info("Enhanced training data prepared successfully",
inventory_product_id=inventory_product_id,
data_points=len(prophet_data))
return prophet_data
2025-07-19 16:59:37 +02:00
except Exception as e:
2025-08-08 09:08:41 +02:00
logger.error("Error preparing enhanced training data",
2025-08-14 16:47:34 +02:00
inventory_product_id=inventory_product_id,
2025-11-05 18:47:20 +01:00
error=str(e),
exc_info=True)
2025-07-19 16:59:37 +02:00
raise
2025-08-08 09:08:41 +02:00
async def _store_processing_metadata(self,
repos: Dict,
tenant_id: str,
2025-08-14 16:47:34 +02:00
inventory_product_id: str,
2025-08-08 09:08:41 +02:00
processed_data: pd.DataFrame,
2025-11-05 18:47:20 +01:00
job_id: str = None,
session=None):
2025-08-08 09:08:41 +02:00
"""Store data processing metadata using repository"""
try:
# Create processing metadata
metadata = {
2025-08-14 16:47:34 +02:00
"inventory_product_id": inventory_product_id,
2025-08-08 09:08:41 +02:00
"data_points": len(processed_data),
"date_range": {
"start": processed_data['ds'].min().isoformat(),
"end": processed_data['ds'].max().isoformat()
},
"features_count": len([col for col in processed_data.columns if col not in ['ds', 'y']]),
"processed_at": datetime.now().isoformat()
}
# Log processing completion
if job_id:
await repos['training_log'].update_log_progress(
2025-08-14 16:47:34 +02:00
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
2025-08-08 09:08:41 +02:00
)
2025-11-05 18:47:20 +01:00
# If we have a session and it's not managed elsewhere, commit it
if session is not None:
# Don't commit here as the caller will manage the session
pass
logger.debug("Data preparation metadata stored",
inventory_product_id=inventory_product_id)
2025-08-08 09:08:41 +02:00
except Exception as e:
logger.warning("Failed to store processing metadata",
error=str(e))
2025-07-19 16:59:37 +02:00
async def prepare_prediction_features(self,
future_dates: pd.DatetimeIndex,
weather_forecast: pd.DataFrame = None,
2025-11-05 13:34:56 +01:00
traffic_forecast: pd.DataFrame = None,
poi_features: Dict[str, Any] = None,
2025-11-05 13:34:56 +01:00
historical_data: pd.DataFrame = None) -> pd.DataFrame:
2025-07-19 16:59:37 +02:00
"""
2025-07-28 19:28:39 +02:00
Create features for future predictions with proper date handling.
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
Args:
future_dates: Future dates to predict
weather_forecast: Weather forecast data
traffic_forecast: Traffic forecast data
poi_features: POI features (location-based, static)
2025-11-05 13:34:56 +01:00
historical_data: Historical data for creating lagged and rolling features
2025-07-19 16:59:37 +02:00
Returns:
DataFrame with features for prediction
"""
try:
# Create base future dataframe
future_df = pd.DataFrame({'ds': future_dates})
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Add temporal features
future_df = self._add_temporal_features(
future_df.rename(columns={'ds': 'date'})
).rename(columns={'date': 'ds'})
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Add weather features
if weather_forecast is not None and not weather_forecast.empty:
weather_features = weather_forecast.copy()
if 'date' in weather_features.columns:
weather_features = weather_features.rename(columns={'date': 'ds'})
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
future_df = future_df.merge(weather_features, on='ds', how='left')
2025-11-05 13:34:56 +01:00
# Add traffic features
2025-07-19 16:59:37 +02:00
if traffic_forecast is not None and not traffic_forecast.empty:
traffic_features = traffic_forecast.copy()
if 'date' in traffic_features.columns:
traffic_features = traffic_features.rename(columns={'date': 'ds'})
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
future_df = future_df.merge(traffic_features, on='ds', how='left')
2025-11-05 13:34:56 +01:00
# Engineer basic features
2025-07-19 16:59:37 +02:00
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
2025-11-05 13:34:56 +01:00
# Add advanced features if historical data is provided
if historical_data is not None and not historical_data.empty:
# Combine historical and future data to calculate lagged/rolling features
combined_df = pd.concat([
historical_data.rename(columns={'ds': 'date'}),
future_df
], ignore_index=True).sort_values('date')
# Apply advanced features to combined data
combined_df = self._add_advanced_features(combined_df)
# Extract only the future rows
future_df = combined_df[combined_df['date'].isin(future_df['date'])].copy()
else:
# Without historical data, add advanced features with NaN for lags
logger.warning("No historical data provided, lagged features will be NaN")
future_df = self._add_advanced_features(future_df)
# Add POI features (static, location-based)
if poi_features:
future_df = self._add_poi_features(future_df, poi_features)
2025-07-19 16:59:37 +02:00
future_df = future_df.rename(columns={'date': 'ds'})
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Handle missing values in future data
2025-07-28 19:28:39 +02:00
future_df = self._handle_missing_values_future(future_df)
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
return future_df
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
except Exception as e:
2025-08-08 09:08:41 +02:00
logger.error("Error creating prediction features", error=str(e))
2025-07-19 16:59:37 +02:00
# Return minimal features if error
return pd.DataFrame({'ds': future_dates})
2025-07-28 19:28:39 +02:00
async def _apply_date_alignment(self,
sales_data: pd.DataFrame,
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame) -> pd.DataFrame:
"""
Apply date alignment constraints to ensure data consistency across sources.
"""
try:
if sales_data.empty:
return sales_data
# Create date range from sales data
sales_dates = pd.to_datetime(sales_data['date'])
sales_date_range = DateRange(
start=sales_dates.min(),
end=sales_dates.max(),
source=DataSourceType.BAKERY_SALES
)
# Get aligned date range considering all constraints
aligned_range = self.date_alignment_service.validate_and_align_dates(
user_sales_range=sales_date_range
)
# Filter sales data to aligned range
mask = (sales_dates >= aligned_range.start) & (sales_dates <= aligned_range.end)
filtered_sales = sales_data[mask].copy()
2025-08-08 09:08:41 +02:00
logger.info("Date alignment completed",
original_records=len(sales_data),
filtered_records=len(filtered_sales),
date_range=f"{aligned_range.start.date()} to {aligned_range.end.date()}")
2025-07-28 19:28:39 +02:00
if aligned_range.constraints:
2025-08-08 09:08:41 +02:00
logger.info("Applied constraints", constraints=aligned_range.constraints)
2025-07-28 19:28:39 +02:00
return filtered_sales
except Exception as e:
2025-08-08 09:08:41 +02:00
logger.warning("Date alignment failed, using original data", error=str(e))
2025-07-28 19:28:39 +02:00
return sales_data
2025-08-14 16:47:34 +02:00
async def _process_sales_data(self, sales_data: pd.DataFrame, inventory_product_id: str) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Process and clean sales data with enhanced validation"""
2025-11-05 18:47:20 +01:00
logger.debug("Starting sales data processing",
inventory_product_id=inventory_product_id,
total_records=len(sales_data),
columns=list(sales_data.columns))
2025-07-19 16:59:37 +02:00
sales_clean = sales_data.copy()
2025-11-05 18:47:20 +01:00
logger.debug("Checking for date column existence",
inventory_product_id=inventory_product_id)
2025-07-19 16:59:37 +02:00
# Ensure date column exists and is datetime
if 'date' not in sales_clean.columns:
2025-11-05 18:47:20 +01:00
logger.error("Sales data must have a 'date' column",
inventory_product_id=inventory_product_id,
available_columns=list(sales_data.columns))
2025-07-19 16:59:37 +02:00
raise ValueError("Sales data must have a 'date' column")
2025-11-05 18:47:20 +01:00
logger.debug("Converting date column to datetime",
inventory_product_id=inventory_product_id)
2025-07-19 16:59:37 +02:00
sales_clean['date'] = pd.to_datetime(sales_clean['date'])
2025-11-05 18:47:20 +01:00
logger.debug("Date conversion completed",
inventory_product_id=inventory_product_id)
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Handle different quantity column names
quantity_columns = ['quantity', 'quantity_sold', 'sales', 'units_sold']
2025-11-05 18:47:20 +01:00
logger.debug("Looking for quantity column",
inventory_product_id=inventory_product_id,
quantity_columns=quantity_columns)
2025-07-28 19:28:39 +02:00
quantity_col = None
for col in quantity_columns:
if col in sales_clean.columns:
quantity_col = col
2025-11-05 18:47:20 +01:00
logger.debug("Found quantity column",
inventory_product_id=inventory_product_id,
quantity_column=col)
2025-07-28 19:28:39 +02:00
break
if quantity_col is None:
2025-11-05 18:47:20 +01:00
logger.error("Sales data must have one of the expected quantity columns",
inventory_product_id=inventory_product_id,
expected_columns=quantity_columns,
available_columns=list(sales_clean.columns))
2025-07-28 19:28:39 +02:00
raise ValueError(f"Sales data must have one of these columns: {quantity_columns}")
# Standardize to 'quantity'
if quantity_col != 'quantity':
2025-11-05 18:47:20 +01:00
logger.debug("Mapping quantity column",
inventory_product_id=inventory_product_id,
from_column=quantity_col,
to_column='quantity')
2025-07-28 19:28:39 +02:00
sales_clean['quantity'] = sales_clean[quantity_col]
2025-08-08 09:08:41 +02:00
logger.info("Mapped quantity column",
from_column=quantity_col,
to_column='quantity')
2025-07-19 16:59:37 +02:00
2025-11-05 18:47:20 +01:00
logger.debug("Converting quantity to numeric",
inventory_product_id=inventory_product_id)
2025-07-19 16:59:37 +02:00
sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce')
2025-11-05 18:47:20 +01:00
logger.debug("Quantity conversion completed",
inventory_product_id=inventory_product_id,
non_numeric_count=sales_clean['quantity'].isna().sum())
2025-07-19 16:59:37 +02:00
# Remove rows with invalid quantities
2025-11-05 18:47:20 +01:00
logger.debug("Removing rows with invalid quantities",
inventory_product_id=inventory_product_id)
2025-07-19 16:59:37 +02:00
sales_clean = sales_clean.dropna(subset=['quantity'])
2025-11-05 18:47:20 +01:00
logger.debug("NaN rows removed",
inventory_product_id=inventory_product_id,
remaining_records=len(sales_clean))
2025-07-19 16:59:37 +02:00
sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales
2025-11-05 18:47:20 +01:00
logger.debug("Negative sales removed",
inventory_product_id=inventory_product_id,
remaining_records=len(sales_clean))
2025-07-19 16:59:37 +02:00
2025-08-14 16:47:34 +02:00
# Filter for the specific product if inventory_product_id column exists
2025-11-05 18:47:20 +01:00
logger.debug("Checking for inventory_product_id column",
inventory_product_id=inventory_product_id,
has_inventory_column='inventory_product_id' in sales_clean.columns)
2025-08-14 16:47:34 +02:00
if 'inventory_product_id' in sales_clean.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Filtering for specific product",
inventory_product_id=inventory_product_id,
products_in_data=sales_clean['inventory_product_id'].unique()[:5].tolist()) # Show first 5
original_count = len(sales_clean)
2025-08-14 16:47:34 +02:00
sales_clean = sales_clean[sales_clean['inventory_product_id'] == inventory_product_id]
2025-11-05 18:47:20 +01:00
logger.debug("Product filtering completed",
inventory_product_id=inventory_product_id,
original_count=original_count,
filtered_count=len(sales_clean))
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Remove duplicate dates (keep the one with highest quantity)
2025-11-05 18:47:20 +01:00
logger.debug("Removing duplicate dates",
inventory_product_id=inventory_product_id,
before_dedupe=len(sales_clean))
2025-07-28 19:28:39 +02:00
sales_clean = sales_clean.sort_values(['date', 'quantity'], ascending=[True, False])
sales_clean = sales_clean.drop_duplicates(subset=['date'], keep='first')
2025-11-05 18:47:20 +01:00
logger.debug("Duplicate dates removed",
inventory_product_id=inventory_product_id,
after_dedupe=len(sales_clean))
2025-07-28 19:28:39 +02:00
2025-11-05 18:47:20 +01:00
logger.debug("Sales data processing completed",
inventory_product_id=inventory_product_id,
final_records=len(sales_clean))
2025-07-19 16:59:37 +02:00
return sales_clean
async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Aggregate sales to daily level with improved date handling"""
2025-11-05 18:47:20 +01:00
logger.debug("Starting daily sales aggregation",
input_records=len(sales_data),
columns=list(sales_data.columns))
2025-07-28 19:28:39 +02:00
if sales_data.empty:
2025-11-05 18:47:20 +01:00
logger.debug("Sales data is empty, returning empty DataFrame")
2025-07-28 19:28:39 +02:00
return pd.DataFrame(columns=['date', 'quantity'])
2025-11-05 18:47:20 +01:00
logger.debug("Starting groupby aggregation",
unique_dates=sales_data['date'].nunique(),
date_range=(sales_data['date'].min(), sales_data['date'].max()))
2025-07-28 19:28:39 +02:00
# Group by date and sum quantities
2025-07-19 16:59:37 +02:00
daily_sales = sales_data.groupby('date').agg({
'quantity': 'sum'
}).reset_index()
2025-11-05 18:47:20 +01:00
logger.debug("Groupby aggregation completed",
aggregated_records=len(daily_sales))
2025-07-28 19:28:39 +02:00
# Ensure we have data for all dates in the range (fill gaps with 0)
2025-11-05 18:47:20 +01:00
logger.debug("Creating full date range",
start_date=daily_sales['date'].min(),
end_date=daily_sales['date'].max())
2025-07-19 16:59:37 +02:00
date_range = pd.date_range(
start=daily_sales['date'].min(),
end=daily_sales['date'].max(),
freq='D'
)
2025-11-05 18:47:20 +01:00
logger.debug("Date range created",
total_dates=len(date_range))
2025-07-19 16:59:37 +02:00
full_date_df = pd.DataFrame({'date': date_range})
2025-11-05 18:47:20 +01:00
logger.debug("Starting merge to fill missing dates",
full_date_records=len(full_date_df),
aggregated_records=len(daily_sales))
2025-07-19 16:59:37 +02:00
daily_sales = full_date_df.merge(daily_sales, on='date', how='left')
2025-11-05 18:47:20 +01:00
logger.debug("Missing date filling merge completed",
final_records=len(daily_sales))
2025-07-19 16:59:37 +02:00
daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales
2025-11-05 18:47:20 +01:00
logger.debug("NaN filling completed",
remaining_nan_count=daily_sales['quantity'].isna().sum(),
zero_filled_count=(daily_sales['quantity'] == 0).sum())
logger.debug("Daily sales aggregation completed",
final_records=len(daily_sales),
final_columns=len(daily_sales.columns))
2025-07-19 16:59:37 +02:00
return daily_sales
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Add comprehensive temporal features for bakery demand patterns"""
2025-07-19 16:59:37 +02:00
df = df.copy()
# Ensure we have a date column
if 'date' not in df.columns:
raise ValueError("DataFrame must have a 'date' column")
df['date'] = pd.to_datetime(df['date'])
2025-07-28 19:28:39 +02:00
# Basic temporal features
df['day_of_week'] = df['date'].dt.dayofweek # 0=Monday, 6=Sunday
df['day_of_month'] = df['date'].dt.day
2025-07-19 16:59:37 +02:00
df['month'] = df['date'].dt.month
2025-07-28 19:28:39 +02:00
df['quarter'] = df['date'].dt.quarter
2025-07-19 16:59:37 +02:00
df['week_of_year'] = df['date'].dt.isocalendar().week
2025-07-28 19:28:39 +02:00
# Bakery-specific features
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_monday'] = (df['day_of_week'] == 0).astype(int) # Monday often has different patterns
df['is_friday'] = (df['day_of_week'] == 4).astype(int) # Friday often busy
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Season mapping for Madrid
df['season'] = df['month'].apply(self._get_season)
df['is_summer'] = (df['season'] == 3).astype(int) # Summer seasonality
df['is_winter'] = (df['season'] == 1).astype(int) # Winter seasonality
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Holiday and special day indicators
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
2025-07-19 16:59:37 +02:00
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
2025-07-28 19:28:39 +02:00
df['is_month_start'] = (df['day_of_month'] <= 3).astype(int)
df['is_month_end'] = (df['day_of_month'] >= 28).astype(int)
# Payday patterns (common in Spain: end/beginning of month)
df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int)
2025-07-19 16:59:37 +02:00
return df
def _merge_weather_features(self,
2025-07-28 20:20:54 +02:00
daily_sales: pd.DataFrame,
weather_data: pd.DataFrame) -> pd.DataFrame:
"""Merge weather features with enhanced Madrid-specific handling"""
2025-11-05 18:47:20 +01:00
logger.debug("Starting weather features merge",
daily_sales_records=len(daily_sales),
weather_data_records=len(weather_data) if not weather_data.empty else 0,
weather_columns=list(weather_data.columns) if not weather_data.empty else [])
2025-07-28 20:20:54 +02:00
2025-08-08 09:08:41 +02:00
# Define weather_defaults OUTSIDE try block to fix scope error
2025-07-28 20:20:54 +02:00
weather_defaults = {
'temperature': 15.0,
'precipitation': 0.0,
'humidity': 60.0,
'wind_speed': 5.0,
'pressure': 1013.0
}
2025-07-19 16:59:37 +02:00
if weather_data.empty:
2025-11-05 18:47:20 +01:00
logger.debug("Weather data is empty, adding default columns")
2025-07-28 20:20:54 +02:00
# Add default weather columns
for feature, default_value in weather_defaults.items():
daily_sales[feature] = default_value
2025-11-05 18:47:20 +01:00
logger.debug("Default weather columns added",
features_added=list(weather_defaults.keys()))
2025-07-19 16:59:37 +02:00
return daily_sales
try:
weather_clean = weather_data.copy()
2025-11-05 18:47:20 +01:00
logger.debug("Weather data copied",
records=len(weather_clean),
columns=list(weather_clean.columns))
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Standardize date column
2025-07-19 16:59:37 +02:00
if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Renaming ds column to date")
2025-07-19 16:59:37 +02:00
weather_clean = weather_clean.rename(columns={'ds': 'date'})
2025-08-08 09:08:41 +02:00
# CRITICAL FIX: Ensure both DataFrames have compatible datetime formats
2025-11-05 18:47:20 +01:00
logger.debug("Converting weather data date column to datetime")
2025-07-19 16:59:37 +02:00
weather_clean['date'] = pd.to_datetime(weather_clean['date'])
2025-11-05 18:47:20 +01:00
logger.debug("Converting daily sales date column to datetime")
2025-07-28 20:20:54 +02:00
daily_sales['date'] = pd.to_datetime(daily_sales['date'])
2025-08-08 09:08:41 +02:00
# NEW FIX: Normalize both to timezone-naive datetime for merge compatibility
2025-07-28 20:20:54 +02:00
if weather_clean['date'].dt.tz is not None:
2025-11-05 18:47:20 +01:00
logger.debug("Removing timezone from weather data")
2025-07-30 08:41:47 +02:00
weather_clean['date'] = weather_clean['date'].dt.tz_convert('UTC').dt.tz_localize(None)
2025-07-28 20:20:54 +02:00
if daily_sales['date'].dt.tz is not None:
2025-11-05 18:47:20 +01:00
logger.debug("Removing timezone from daily sales data")
2025-07-30 08:41:47 +02:00
daily_sales['date'] = daily_sales['date'].dt.tz_convert('UTC').dt.tz_localize(None)
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Map weather columns to standard names
2025-07-19 16:59:37 +02:00
weather_mapping = {
2025-07-28 20:20:54 +02:00
'temperature': ['temperature', 'temp', 'temperatura'],
'precipitation': ['precipitation', 'precip', 'rain', 'lluvia'],
2025-07-28 19:28:39 +02:00
'humidity': ['humidity', 'humedad', 'relative_humidity'],
2025-07-28 20:20:54 +02:00
'wind_speed': ['wind_speed', 'viento', 'wind'],
2025-07-28 19:28:39 +02:00
'pressure': ['pressure', 'presion', 'atmospheric_pressure']
2025-07-19 16:59:37 +02:00
}
2025-07-28 19:28:39 +02:00
weather_features = ['date']
2025-11-05 18:47:20 +01:00
logger.debug("Mapping weather columns",
mapping_attempts=list(weather_mapping.keys()))
2025-07-28 19:28:39 +02:00
2025-07-19 16:59:37 +02:00
for standard_name, possible_names in weather_mapping.items():
for possible_name in possible_names:
if possible_name in weather_clean.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Processing weather column",
standard_name=standard_name,
possible_name=possible_name,
records=len(weather_clean))
2025-11-05 13:34:56 +01:00
# Extract numeric values using robust helper function
try:
# Check if column contains dict-like objects
2025-11-05 18:47:20 +01:00
logger.debug("Checking for dict objects in weather column")
2025-11-05 13:34:56 +01:00
has_dicts = weather_clean[possible_name].apply(lambda x: isinstance(x, dict)).any()
2025-11-05 18:47:20 +01:00
logger.debug("Dict object check completed",
has_dicts=has_dicts)
2025-11-05 13:34:56 +01:00
if has_dicts:
logger.warning(f"Weather column {possible_name} contains dict objects, extracting numeric values")
# Use robust extraction for all values
weather_clean[standard_name] = weather_clean[possible_name].apply(
self._extract_numeric_from_dict
)
2025-11-05 18:47:20 +01:00
logger.debug("Dict extraction completed for weather column",
extracted_column=standard_name,
extracted_count=weather_clean[standard_name].notna().sum())
2025-11-05 13:34:56 +01:00
else:
# Direct numeric conversion for simple values
2025-11-05 18:47:20 +01:00
logger.debug("Performing direct numeric conversion")
2025-11-05 13:34:56 +01:00
weather_clean[standard_name] = pd.to_numeric(weather_clean[possible_name], errors='coerce')
2025-11-05 18:47:20 +01:00
logger.debug("Direct numeric conversion completed")
2025-11-05 13:34:56 +01:00
except Exception as e:
logger.warning(f"Error converting weather column {possible_name}: {e}")
# Fallback: try to extract from each value
weather_clean[standard_name] = weather_clean[possible_name].apply(
self._extract_numeric_from_dict
)
2025-07-19 16:59:37 +02:00
weather_features.append(standard_name)
2025-11-05 18:47:20 +01:00
logger.debug("Added weather feature to list",
feature=standard_name)
2025-07-19 16:59:37 +02:00
break
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Keep only the features we found
2025-11-05 18:47:20 +01:00
logger.debug("Selecting weather features",
selected_features=weather_features)
2025-07-19 16:59:37 +02:00
weather_clean = weather_clean[weather_features].copy()
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Merge with sales data
2025-11-05 18:47:20 +01:00
logger.debug("Starting merge operation",
daily_sales_rows=len(daily_sales),
weather_rows=len(weather_clean),
date_range_sales=(daily_sales['date'].min(), daily_sales['date'].max()) if len(daily_sales) > 0 else None,
date_range_weather=(weather_clean['date'].min(), weather_clean['date'].max()) if len(weather_clean) > 0 else None)
2025-07-19 16:59:37 +02:00
merged = daily_sales.merge(weather_clean, on='date', how='left')
2025-11-05 18:47:20 +01:00
logger.debug("Merge completed",
merged_rows=len(merged),
merge_type='left')
2025-07-28 19:28:39 +02:00
# Fill missing weather values with Madrid-appropriate defaults
2025-11-05 18:47:20 +01:00
logger.debug("Filling missing weather values",
features_to_fill=list(weather_defaults.keys()))
2025-07-28 19:28:39 +02:00
for feature, default_value in weather_defaults.items():
if feature in merged.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Processing feature for NaN fill",
feature=feature,
nan_count=merged[feature].isna().sum())
2025-11-05 13:34:56 +01:00
# Ensure the column is numeric before filling
merged[feature] = pd.to_numeric(merged[feature], errors='coerce')
2025-07-28 19:28:39 +02:00
merged[feature] = merged[feature].fillna(default_value)
2025-11-05 18:47:20 +01:00
logger.debug("NaN fill completed for feature",
feature=feature,
final_nan_count=merged[feature].isna().sum())
2025-07-19 16:59:37 +02:00
2025-11-05 18:47:20 +01:00
logger.debug("Weather features merge completed",
final_rows=len(merged),
final_columns=len(merged.columns))
2025-07-19 16:59:37 +02:00
return merged
except Exception as e:
2025-11-05 18:47:20 +01:00
logger.warning("Error merging weather data", error=str(e), exc_info=True)
2025-08-08 09:08:41 +02:00
# Add default weather columns if merge fails
2025-07-28 19:28:39 +02:00
for feature, default_value in weather_defaults.items():
daily_sales[feature] = default_value
2025-11-05 18:47:20 +01:00
logger.debug("Default weather columns added after merge failure",
features_added=list(weather_defaults.keys()))
2025-07-19 16:59:37 +02:00
return daily_sales
2025-07-28 20:20:54 +02:00
2025-07-19 16:59:37 +02:00
def _merge_traffic_features(self,
daily_sales: pd.DataFrame,
traffic_data: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Merge traffic features with enhanced Madrid-specific handling"""
2025-11-05 18:47:20 +01:00
logger.debug("Starting traffic features merge",
daily_sales_records=len(daily_sales),
traffic_data_records=len(traffic_data) if not traffic_data.empty else 0,
traffic_columns=list(traffic_data.columns) if not traffic_data.empty else [])
2025-07-19 16:59:37 +02:00
if traffic_data.empty:
2025-11-05 18:47:20 +01:00
logger.debug("Traffic data is empty, adding default column")
2025-07-19 16:59:37 +02:00
# Add default traffic column
daily_sales['traffic_volume'] = 100.0 # Neutral traffic level
2025-11-05 18:47:20 +01:00
logger.debug("Default traffic column added",
default_value=100.0)
2025-07-19 16:59:37 +02:00
return daily_sales
try:
traffic_clean = traffic_data.copy()
2025-11-05 18:47:20 +01:00
logger.debug("Traffic data copied",
records=len(traffic_clean),
columns=list(traffic_clean.columns))
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Standardize date column
2025-07-19 16:59:37 +02:00
if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Renaming ds column to date")
2025-07-19 16:59:37 +02:00
traffic_clean = traffic_clean.rename(columns={'ds': 'date'})
2025-08-08 09:08:41 +02:00
# CRITICAL FIX: Ensure both DataFrames have compatible datetime formats
2025-11-05 18:47:20 +01:00
logger.debug("Converting traffic data date column to datetime")
2025-07-19 16:59:37 +02:00
traffic_clean['date'] = pd.to_datetime(traffic_clean['date'])
2025-11-05 18:47:20 +01:00
logger.debug("Converting daily sales date column to datetime")
2025-07-30 08:41:47 +02:00
daily_sales['date'] = pd.to_datetime(daily_sales['date'])
2025-08-08 09:08:41 +02:00
# NEW FIX: Normalize both to timezone-naive datetime for merge compatibility
2025-07-30 08:41:47 +02:00
if traffic_clean['date'].dt.tz is not None:
2025-11-05 18:47:20 +01:00
logger.debug("Removing timezone from traffic data")
2025-07-30 08:41:47 +02:00
traffic_clean['date'] = traffic_clean['date'].dt.tz_convert('UTC').dt.tz_localize(None)
if daily_sales['date'].dt.tz is not None:
2025-11-05 18:47:20 +01:00
logger.debug("Removing timezone from daily sales data")
2025-07-30 08:41:47 +02:00
daily_sales['date'] = daily_sales['date'].dt.tz_convert('UTC').dt.tz_localize(None)
2025-07-19 16:59:37 +02:00
2025-07-28 19:28:39 +02:00
# Map traffic columns to standard names
2025-07-19 16:59:37 +02:00
traffic_mapping = {
2025-07-28 19:28:39 +02:00
'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad', 'volume'],
'pedestrian_count': ['pedestrian_count', 'peatones', 'pedestrians'],
'congestion_level': ['congestion_level', 'congestion', 'nivel_congestion'],
'average_speed': ['average_speed', 'speed', 'velocidad_media', 'avg_speed']
2025-07-19 16:59:37 +02:00
}
2025-07-28 19:28:39 +02:00
traffic_features = ['date']
2025-11-05 18:47:20 +01:00
logger.debug("Mapping traffic columns",
mapping_attempts=list(traffic_mapping.keys()))
2025-07-28 19:28:39 +02:00
2025-07-19 16:59:37 +02:00
for standard_name, possible_names in traffic_mapping.items():
for possible_name in possible_names:
if possible_name in traffic_clean.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Processing traffic column",
standard_name=standard_name,
possible_name=possible_name,
records=len(traffic_clean))
2025-11-05 13:34:56 +01:00
# Extract numeric values using robust helper function
try:
# Check if column contains dict-like objects
2025-11-05 18:47:20 +01:00
logger.debug("Checking for dict objects in traffic column")
2025-11-05 13:34:56 +01:00
has_dicts = traffic_clean[possible_name].apply(lambda x: isinstance(x, dict)).any()
2025-11-05 18:47:20 +01:00
logger.debug("Dict object check completed",
has_dicts=has_dicts)
2025-11-05 13:34:56 +01:00
if has_dicts:
logger.warning(f"Traffic column {possible_name} contains dict objects, extracting numeric values")
# Use robust extraction for all values
traffic_clean[standard_name] = traffic_clean[possible_name].apply(
self._extract_numeric_from_dict
)
2025-11-05 18:47:20 +01:00
logger.debug("Dict extraction completed for traffic column",
extracted_column=standard_name,
extracted_count=traffic_clean[standard_name].notna().sum())
2025-11-05 13:34:56 +01:00
else:
# Direct numeric conversion for simple values
2025-11-05 18:47:20 +01:00
logger.debug("Performing direct numeric conversion")
2025-11-05 13:34:56 +01:00
traffic_clean[standard_name] = pd.to_numeric(traffic_clean[possible_name], errors='coerce')
2025-11-05 18:47:20 +01:00
logger.debug("Direct numeric conversion completed")
2025-11-05 13:34:56 +01:00
except Exception as e:
logger.warning(f"Error converting traffic column {possible_name}: {e}")
# Fallback: try to extract from each value
traffic_clean[standard_name] = traffic_clean[possible_name].apply(
self._extract_numeric_from_dict
)
2025-07-19 16:59:37 +02:00
traffic_features.append(standard_name)
2025-11-05 18:47:20 +01:00
logger.debug("Added traffic feature to list",
feature=standard_name)
2025-07-19 16:59:37 +02:00
break
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Keep only the features we found
2025-11-05 18:47:20 +01:00
logger.debug("Selecting traffic features",
selected_features=traffic_features)
2025-07-19 16:59:37 +02:00
traffic_clean = traffic_clean[traffic_features].copy()
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
# Merge with sales data
2025-11-05 18:47:20 +01:00
logger.debug("Starting traffic merge operation",
daily_sales_rows=len(daily_sales),
traffic_rows=len(traffic_clean),
date_range_sales=(daily_sales['date'].min(), daily_sales['date'].max()) if len(daily_sales) > 0 else None,
date_range_traffic=(traffic_clean['date'].min(), traffic_clean['date'].max()) if len(traffic_clean) > 0 else None)
2025-07-19 16:59:37 +02:00
merged = daily_sales.merge(traffic_clean, on='date', how='left')
2025-11-05 18:47:20 +01:00
logger.debug("Traffic merge completed",
merged_rows=len(merged),
merge_type='left')
2025-07-28 19:28:39 +02:00
# Fill missing traffic values with reasonable defaults
traffic_defaults = {
'traffic_volume': 100.0,
'pedestrian_count': 50.0,
'congestion_level': 1.0, # Low congestion
'average_speed': 30.0 # km/h typical for Madrid
}
2025-11-05 13:34:56 +01:00
2025-11-05 18:47:20 +01:00
logger.debug("Filling missing traffic values",
features_to_fill=list(traffic_defaults.keys()))
2025-07-28 19:28:39 +02:00
for feature, default_value in traffic_defaults.items():
if feature in merged.columns:
2025-11-05 18:47:20 +01:00
logger.debug("Processing traffic feature for NaN fill",
feature=feature,
nan_count=merged[feature].isna().sum())
2025-11-05 13:34:56 +01:00
# Ensure the column is numeric before filling
merged[feature] = pd.to_numeric(merged[feature], errors='coerce')
2025-07-28 19:28:39 +02:00
merged[feature] = merged[feature].fillna(default_value)
2025-11-05 18:47:20 +01:00
logger.debug("NaN fill completed for traffic feature",
feature=feature,
final_nan_count=merged[feature].isna().sum())
2025-07-19 16:59:37 +02:00
2025-11-05 18:47:20 +01:00
logger.debug("Traffic features merge completed",
final_rows=len(merged),
final_columns=len(merged.columns))
2025-07-19 16:59:37 +02:00
return merged
except Exception as e:
2025-11-05 18:47:20 +01:00
logger.warning("Error merging traffic data", error=str(e), exc_info=True)
2025-07-19 16:59:37 +02:00
# Add default traffic column if merge fails
daily_sales['traffic_volume'] = 100.0
2025-11-05 18:47:20 +01:00
logger.debug("Default traffic column added after merge failure",
default_value=100.0)
2025-07-19 16:59:37 +02:00
return daily_sales
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Engineer additional features from existing data with bakery-specific insights"""
2025-07-19 16:59:37 +02:00
df = df.copy()
# Weather-based features
if 'temperature' in df.columns:
2025-11-05 13:34:56 +01:00
# Ensure temperature is numeric (defensive check)
df['temperature'] = pd.to_numeric(df['temperature'], errors='coerce').fillna(15.0)
2025-07-19 16:59:37 +02:00
df['temp_squared'] = df['temperature'] ** 2
2025-07-28 19:28:39 +02:00
df['is_hot_day'] = (df['temperature'] > 25).astype(int) # Hot days in Madrid
df['is_cold_day'] = (df['temperature'] < 10).astype(int) # Cold days
df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int)
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
# Temperature categories for bakery products
2025-11-05 13:34:56 +01:00
df['temp_category'] = pd.cut(df['temperature'],
bins=[-np.inf, 5, 15, 25, np.inf],
2025-07-28 20:20:54 +02:00
labels=[0, 1, 2, 3]).astype(int)
2025-11-05 13:34:56 +01:00
2025-07-19 16:59:37 +02:00
if 'precipitation' in df.columns:
2025-11-05 13:34:56 +01:00
# Ensure precipitation is numeric (defensive check)
df['precipitation'] = pd.to_numeric(df['precipitation'], errors='coerce').fillna(0.0)
2025-07-28 19:28:39 +02:00
df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int)
df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int)
df['rain_intensity'] = pd.cut(df['precipitation'],
bins=[-0.1, 0, 2, 10, np.inf],
labels=[0, 1, 2, 3]).astype(int)
2025-07-19 16:59:37 +02:00
2025-08-08 09:08:41 +02:00
# Traffic-based features with NaN protection
2025-07-19 16:59:37 +02:00
if 'traffic_volume' in df.columns:
2025-11-05 13:34:56 +01:00
# Ensure traffic_volume is numeric (defensive check)
df['traffic_volume'] = pd.to_numeric(df['traffic_volume'], errors='coerce').fillna(100.0)
2025-07-28 19:28:39 +02:00
# Calculate traffic quantiles for relative measures
q75 = df['traffic_volume'].quantile(0.75)
q25 = df['traffic_volume'].quantile(0.25)
2025-11-05 13:34:56 +01:00
2025-07-28 19:28:39 +02:00
df['high_traffic'] = (df['traffic_volume'] > q75).astype(int)
df['low_traffic'] = (df['traffic_volume'] < q25).astype(int)
2025-07-28 20:20:54 +02:00
2025-08-08 09:08:41 +02:00
# Safe normalization with NaN protection
2025-07-28 20:20:54 +02:00
traffic_std = df['traffic_volume'].std()
traffic_mean = df['traffic_volume'].mean()
if traffic_std > 0 and not pd.isna(traffic_std) and not pd.isna(traffic_mean):
df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std
2025-08-12 18:17:30 +02:00
# Store normalization parameters for later use in predictions
self.scalers['traffic_mean'] = float(traffic_mean)
self.scalers['traffic_std'] = float(traffic_std)
logger.info(f"Traffic normalization parameters: mean={traffic_mean:.2f}, std={traffic_std:.2f}")
2025-07-28 20:20:54 +02:00
else:
2025-08-08 09:08:41 +02:00
logger.warning("Traffic volume has zero standard deviation, using zeros for normalized values")
2025-07-28 20:20:54 +02:00
df['traffic_normalized'] = 0.0
2025-08-12 18:17:30 +02:00
# Store default parameters for consistency
self.scalers['traffic_mean'] = 100.0 # Default traffic level used during training
self.scalers['traffic_std'] = 50.0 # Reasonable std for traffic normalization
2025-07-28 20:20:54 +02:00
2025-08-08 09:08:41 +02:00
# Fill any remaining NaN values
2025-07-28 20:20:54 +02:00
df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0)
2025-11-05 13:34:56 +01:00
# Ensure other weather features are numeric if they exist
for weather_col in ['humidity', 'wind_speed', 'pressure', 'pedestrian_count', 'congestion_level', 'average_speed']:
if weather_col in df.columns:
df[weather_col] = pd.to_numeric(df[weather_col], errors='coerce').fillna(
{'humidity': 60.0, 'wind_speed': 5.0, 'pressure': 1013.0,
'pedestrian_count': 50.0, 'congestion_level': 1.0, 'average_speed': 30.0}.get(weather_col, 0.0)
)
2025-07-28 19:28:39 +02:00
# Interaction features - bakery specific
2025-07-19 16:59:37 +02:00
if 'is_weekend' in df.columns and 'temperature' in df.columns:
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
2025-07-28 19:28:39 +02:00
df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0)
2025-07-19 16:59:37 +02:00
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
2025-07-28 19:28:39 +02:00
if 'is_holiday' in df.columns and 'temperature' in df.columns:
df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature']
# Seasonal interactions
if 'season' in df.columns and 'temperature' in df.columns:
df['season_temp_interaction'] = df['season'] * df['temperature']
# Day-of-week specific features
if 'day_of_week' in df.columns:
# Working days vs weekends
df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int)
# Peak bakery days (Friday, Saturday, Sunday often busy)
df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int)
# Month-specific features for bakery seasonality
if 'month' in df.columns:
2025-07-28 20:20:54 +02:00
# High-demand months (holidays, summer)
df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int)
# Spring/summer months
df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int)
2025-08-08 09:08:41 +02:00
# FINAL SAFETY CHECK: Remove any remaining NaN values
2025-07-28 20:20:54 +02:00
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if df[col].isna().any():
nan_count = df[col].isna().sum()
2025-08-08 09:08:41 +02:00
logger.warning("Found NaN values in column, filling with 0",
column=col,
nan_count=nan_count)
2025-07-28 20:20:54 +02:00
df[col] = df[col].fillna(0.0)
2025-11-05 13:34:56 +01:00
return df
def _add_advanced_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Add advanced features using AdvancedFeatureEngineer.
Includes lagged features, rolling statistics, cyclical encoding, and trend features.
"""
df = df.copy()
2025-11-05 18:47:20 +01:00
logger.info("Adding advanced features (lagged, rolling, cyclical, trends)",
input_rows=len(df),
input_columns=len(df.columns))
# Log column dtypes to identify potential issues
logger.debug("Input dataframe dtypes",
dtypes={col: str(dtype) for col, dtype in df.dtypes.items()},
date_column_exists='date' in df.columns)
2025-11-05 13:34:56 +01:00
# Reset feature engineer to clear previous features
2025-11-05 18:47:20 +01:00
logger.debug("Initializing AdvancedFeatureEngineer")
2025-11-05 13:34:56 +01:00
self.feature_engineer = AdvancedFeatureEngineer()
# Create all advanced features at once
2025-11-05 18:47:20 +01:00
logger.debug("Starting creation of advanced features",
include_lags=True,
include_rolling=True,
include_interactions=True,
include_cyclical=True)
2025-11-05 13:34:56 +01:00
df = self.feature_engineer.create_all_features(
df,
date_column='date',
include_lags=True,
include_rolling=True,
include_interactions=True,
include_cyclical=True
)
2025-11-05 18:47:20 +01:00
logger.debug("Advanced features creation completed",
output_rows=len(df),
output_columns=len(df.columns))
2025-11-05 13:34:56 +01:00
# Fill NA values from lagged and rolling features
2025-11-05 18:47:20 +01:00
logger.debug("Starting NA value filling",
na_counts={col: df[col].isna().sum() for col in df.columns if df[col].isna().any()})
2025-11-05 13:34:56 +01:00
df = self.feature_engineer.fill_na_values(df, strategy='forward_backward')
2025-11-05 18:47:20 +01:00
logger.debug("NA value filling completed",
remaining_na_counts={col: df[col].isna().sum() for col in df.columns if df[col].isna().any()})
2025-11-05 13:34:56 +01:00
# Store created feature columns for later reference
created_features = self.feature_engineer.get_feature_columns()
logger.info(f"Added {len(created_features)} advanced features",
features=created_features[:10]) # Log first 10 for brevity
2025-07-19 16:59:37 +02:00
return df
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Handle missing values in the dataset with improved strategies"""
2025-07-19 16:59:37 +02:00
df = df.copy()
2025-07-28 19:28:39 +02:00
# For numeric columns, use appropriate imputation strategies
2025-07-19 16:59:37 +02:00
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if col != 'quantity' and df[col].isna().any():
2025-07-28 19:28:39 +02:00
# Use different strategies based on column type
if 'temperature' in col:
df[col] = df[col].fillna(15.0) # Madrid average
elif 'precipitation' in col or 'rain' in col:
df[col] = df[col].fillna(0.0) # Default no rain
elif 'humidity' in col:
df[col] = df[col].fillna(60.0) # Moderate humidity
elif 'traffic' in col:
df[col] = df[col].fillna(df[col].median()) # Use median for traffic
elif 'wind' in col:
df[col] = df[col].fillna(5.0) # Light wind
elif 'pressure' in col:
df[col] = df[col].fillna(1013.0) # Standard atmospheric pressure
else:
# For other columns, use median or forward fill
if df[col].count() > 0:
df[col] = df[col].fillna(df[col].median())
else:
df[col] = df[col].fillna(0)
return df
def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in future prediction data"""
numeric_columns = df.select_dtypes(include=[np.number]).columns
madrid_defaults = {
'temperature': 15.0,
'precipitation': 0.0,
'humidity': 60.0,
'wind_speed': 5.0,
'traffic_volume': 100.0,
'pedestrian_count': 50.0,
'pressure': 1013.0
}
for col in numeric_columns:
if df[col].isna().any():
# Find appropriate default value
default_value = 0
for key, value in madrid_defaults.items():
if key in col.lower():
default_value = value
break
df[col] = df[col].fillna(default_value)
2025-07-19 16:59:37 +02:00
return df
def _add_poi_features(self, df: pd.DataFrame, poi_features: Dict[str, Any]) -> pd.DataFrame:
"""
Add POI features to training dataframe.
POI features are static (location-based, not time-varying),
so they're broadcast to all rows in the dataframe.
Args:
df: Training dataframe
poi_features: Dictionary of POI ML features
Returns:
Dataframe with POI features added as columns
"""
if not poi_features:
logger.warning("No POI features to add")
return df
logger.info(f"Adding {len(poi_features)} POI features to dataframe")
# Add each POI feature as a column with constant value
for feature_name, feature_value in poi_features.items():
# Convert boolean to int for ML compatibility
if isinstance(feature_value, bool):
feature_value = 1 if feature_value else 0
df[feature_name] = feature_value
logger.info(
"POI features added successfully",
feature_count=len(poi_features),
feature_names=list(poi_features.keys())[:5] # Log first 5 for brevity
)
return df
2025-07-19 16:59:37 +02:00
def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame:
2025-07-28 19:28:39 +02:00
"""Prepare data in Prophet format with enhanced validation"""
2025-07-19 16:59:37 +02:00
prophet_df = df.copy()
# Rename columns for Prophet
if 'date' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'date': 'ds'})
if 'quantity' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'quantity': 'y'})
2025-07-28 19:28:39 +02:00
# Ensure ds is datetime and remove timezone info
2025-07-19 16:59:37 +02:00
if 'ds' in prophet_df.columns:
prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
2025-07-28 19:28:39 +02:00
if prophet_df['ds'].dt.tz is not None:
prophet_df['ds'] = prophet_df['ds'].dt.tz_localize(None)
2025-07-19 16:59:37 +02:00
# Validate required columns
if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns:
raise ValueError("Prophet data must have 'ds' and 'y' columns")
2025-07-28 19:28:39 +02:00
# Clean target values
2025-07-19 16:59:37 +02:00
prophet_df = prophet_df.dropna(subset=['y'])
2025-07-28 19:28:39 +02:00
prophet_df['y'] = prophet_df['y'].clip(lower=0) # No negative sales
# Remove any duplicate dates (keep last occurrence)
prophet_df = prophet_df.drop_duplicates(subset=['ds'], keep='last')
2025-07-19 16:59:37 +02:00
# Sort by date
prophet_df = prophet_df.sort_values('ds').reset_index(drop=True)
2025-07-28 19:28:39 +02:00
# Final validation
if len(prophet_df) == 0:
raise ValueError("No valid data points after cleaning")
2025-08-08 09:08:41 +02:00
logger.info("Prophet data prepared",
rows=len(prophet_df),
date_range=f"{prophet_df['ds'].min()} to {prophet_df['ds'].max()}")
2025-07-28 19:28:39 +02:00
2025-07-19 16:59:37 +02:00
return prophet_df
def _get_season(self, month: int) -> int:
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
if month in [12, 1, 2]:
return 1 # Winter
elif month in [3, 4, 5]:
return 2 # Spring
elif month in [6, 7, 8]:
return 3 # Summer
else:
return 4 # Autumn
def _is_spanish_holiday(self, date: datetime) -> bool:
2025-11-05 13:34:56 +01:00
"""
Check if a date is a Spanish holiday using holidays library.
Supports dynamic Easter calculation and regional holidays.
"""
try:
# Convert to date if datetime
if isinstance(date, datetime):
date = date.date()
elif isinstance(date, pd.Timestamp):
date = date.date()
# Check if date is in holidays
return date in self.spain_holidays
except Exception as e:
logger.warning(f"Error checking holiday status for {date}: {e}")
# Fallback to checking basic holidays
month_day = (date.month, date.day)
basic_holidays = [
(1, 1), (1, 6), (5, 1), (8, 15), (10, 12),
(11, 1), (12, 6), (12, 8), (12, 25)
]
return month_day in basic_holidays
2025-07-19 16:59:37 +02:00
def _is_school_holiday(self, date: datetime) -> bool:
2025-11-05 13:34:56 +01:00
"""
Check if a date is during school holidays in Spain.
Uses dynamic Easter calculation and standard Spanish school calendar.
"""
try:
from datetime import timedelta
import holidays as hol
# Convert to date if datetime
if isinstance(date, datetime):
check_date = date.date()
elif isinstance(date, pd.Timestamp):
check_date = date.date()
else:
check_date = date
month = check_date.month
day = check_date.day
# Summer holidays (July 1 - August 31)
if month in [7, 8]:
return True
# Christmas holidays (December 23 - January 7)
if (month == 12 and day >= 23) or (month == 1 and day <= 7):
return True
# Easter/Spring break (Semana Santa)
# Calculate Easter for this year
year = check_date.year
spain_hol = hol.Spain(years=year, prov=self.region)
# Find Easter dates (Viernes Santo - Good Friday, and nearby days)
# Easter break typically spans 1 week before and after Easter Sunday
for holiday_date, holiday_name in spain_hol.items():
if 'viernes santo' in holiday_name.lower() or 'easter' in holiday_name.lower():
# Easter break: 7 days before and 7 days after
easter_start = holiday_date - timedelta(days=7)
easter_end = holiday_date + timedelta(days=7)
if easter_start <= check_date <= easter_end:
return True
return False
except Exception as e:
logger.warning(f"Error checking school holiday for {date}: {e}")
# Fallback to simple approximation
month = date.month if hasattr(date, 'month') else date.month
day = date.day if hasattr(date, 'day') else date.day
return (month in [7, 8] or
(month == 12 and day >= 23) or
(month == 1 and day <= 7) or
(month == 4 and 1 <= day <= 15)) # Approximate Easter
2025-07-19 16:59:37 +02:00
2025-08-08 09:08:41 +02:00
async def calculate_feature_importance(self,
2025-07-19 16:59:37 +02:00
model_data: pd.DataFrame,
target_column: str = 'y') -> Dict[str, float]:
"""
2025-08-08 09:08:41 +02:00
Calculate feature importance for the model using correlation analysis with repository logging.
2025-07-19 16:59:37 +02:00
"""
try:
2025-07-28 19:28:39 +02:00
# Get numeric features
2025-07-19 16:59:37 +02:00
numeric_features = model_data.select_dtypes(include=[np.number]).columns
numeric_features = [col for col in numeric_features if col != target_column]
importance_scores = {}
2025-07-28 19:28:39 +02:00
if target_column not in model_data.columns:
2025-08-08 09:08:41 +02:00
logger.warning("Target column not found", target_column=target_column)
2025-07-28 19:28:39 +02:00
return {}
2025-07-19 16:59:37 +02:00
for feature in numeric_features:
if feature in model_data.columns:
correlation = model_data[feature].corr(model_data[target_column])
2025-07-28 19:28:39 +02:00
if not pd.isna(correlation) and not np.isinf(correlation):
importance_scores[feature] = abs(correlation)
2025-07-19 16:59:37 +02:00
# Sort by importance
importance_scores = dict(sorted(importance_scores.items(),
key=lambda x: x[1], reverse=True))
2025-08-08 09:08:41 +02:00
logger.info("Calculated feature importance",
features_count=len(importance_scores))
2025-07-19 16:59:37 +02:00
return importance_scores
except Exception as e:
2025-08-08 09:08:41 +02:00
logger.error("Error calculating feature importance", error=str(e))
2025-07-28 19:28:39 +02:00
return {}
2025-08-08 09:08:41 +02:00
async def get_data_quality_report(self, df: pd.DataFrame) -> Dict[str, Any]:
2025-07-28 19:28:39 +02:00
"""
2025-08-08 09:08:41 +02:00
Generate a comprehensive data quality report with repository integration.
2025-07-28 19:28:39 +02:00
"""
try:
report = {
"total_records": len(df),
"date_range": {
"start": df['ds'].min().isoformat() if 'ds' in df.columns else None,
"end": df['ds'].max().isoformat() if 'ds' in df.columns else None,
"duration_days": (df['ds'].max() - df['ds'].min()).days if 'ds' in df.columns else 0
},
"missing_values": {},
"data_completeness": 0.0,
"target_statistics": {},
"feature_count": 0
}
# Calculate missing values
missing_counts = df.isnull().sum()
total_cells = len(df)
for col in df.columns:
missing_count = missing_counts[col]
report["missing_values"][col] = {
"count": int(missing_count),
"percentage": round((missing_count / total_cells) * 100, 2)
}
# Overall completeness
total_missing = missing_counts.sum()
total_possible = len(df) * len(df.columns)
report["data_completeness"] = round(((total_possible - total_missing) / total_possible) * 100, 2)
# Target variable statistics
if 'y' in df.columns:
y_col = df['y']
report["target_statistics"] = {
"mean": round(y_col.mean(), 2),
"median": round(y_col.median(), 2),
"std": round(y_col.std(), 2),
"min": round(y_col.min(), 2),
"max": round(y_col.max(), 2),
"zero_count": int((y_col == 0).sum()),
"zero_percentage": round(((y_col == 0).sum() / len(y_col)) * 100, 2)
}
# Feature count
numeric_features = df.select_dtypes(include=[np.number]).columns
report["feature_count"] = len([col for col in numeric_features if col not in ['y', 'ds']])
return report
except Exception as e:
2025-08-08 09:08:41 +02:00
logger.error("Error generating data quality report", error=str(e))
return {"error": str(e)}