285 lines
9.7 KiB
Python
285 lines
9.7 KiB
Python
"""
|
|
Traffic Forecasting System
|
|
Predicts bakery foot traffic using weather and temporal features
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Dict, List, Any, Optional
|
|
from prophet import Prophet
|
|
import structlog
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class TrafficForecaster:
|
|
"""
|
|
Forecast bakery foot traffic using Prophet with weather and temporal features.
|
|
|
|
Traffic patterns are influenced by:
|
|
- Weather: Temperature, precipitation, conditions
|
|
- Time: Day of week, holidays, season
|
|
- Special events: Local events, promotions
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.model = None
|
|
self.is_trained = False
|
|
|
|
def train(
|
|
self,
|
|
historical_traffic: pd.DataFrame,
|
|
weather_data: pd.DataFrame = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Train traffic forecasting model.
|
|
|
|
Args:
|
|
historical_traffic: DataFrame with columns ['date', 'traffic_count']
|
|
weather_data: Optional weather data with columns ['date', 'temperature', 'precipitation', 'condition']
|
|
|
|
Returns:
|
|
Training metrics
|
|
"""
|
|
try:
|
|
logger.info("Training traffic forecasting model",
|
|
data_points=len(historical_traffic))
|
|
|
|
# Prepare Prophet format
|
|
df = historical_traffic.copy()
|
|
df = df.rename(columns={'date': 'ds', 'traffic_count': 'y'})
|
|
df['ds'] = pd.to_datetime(df['ds'])
|
|
df = df.sort_values('ds')
|
|
|
|
# Merge with weather data if available
|
|
if weather_data is not None:
|
|
weather_data = weather_data.copy()
|
|
weather_data['date'] = pd.to_datetime(weather_data['date'])
|
|
df = df.merge(weather_data, left_on='ds', right_on='date', how='left')
|
|
|
|
# Create Prophet model with custom settings for traffic
|
|
self.model = Prophet(
|
|
seasonality_mode='multiplicative',
|
|
yearly_seasonality=True,
|
|
weekly_seasonality=True,
|
|
daily_seasonality=False,
|
|
changepoint_prior_scale=0.05, # Moderate flexibility
|
|
seasonality_prior_scale=10.0,
|
|
holidays_prior_scale=10.0
|
|
)
|
|
|
|
# Add weather regressors if available
|
|
if 'temperature' in df.columns:
|
|
self.model.add_regressor('temperature')
|
|
if 'precipitation' in df.columns:
|
|
self.model.add_regressor('precipitation')
|
|
if 'is_rainy' in df.columns:
|
|
self.model.add_regressor('is_rainy')
|
|
|
|
# Add custom holidays for Spain
|
|
from app.ml.prophet_manager import BakeryProphetManager
|
|
spanish_holidays = self._get_spanish_holidays(
|
|
df['ds'].min().year,
|
|
df['ds'].max().year + 1
|
|
)
|
|
self.model.add_country_holidays(country_name='ES')
|
|
|
|
# Fit model
|
|
self.model.fit(df)
|
|
self.is_trained = True
|
|
|
|
# Calculate training metrics
|
|
predictions = self.model.predict(df)
|
|
metrics = self._calculate_metrics(df['y'].values, predictions['yhat'].values)
|
|
|
|
logger.info("Traffic forecasting model trained successfully",
|
|
mape=metrics['mape'],
|
|
rmse=metrics['rmse'])
|
|
|
|
return metrics
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to train traffic forecasting model: {e}")
|
|
raise
|
|
|
|
def predict(
|
|
self,
|
|
future_dates: pd.DatetimeIndex,
|
|
weather_forecast: pd.DataFrame = None
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Predict traffic for future dates.
|
|
|
|
Args:
|
|
future_dates: Dates to predict traffic for
|
|
weather_forecast: Optional weather forecast data
|
|
|
|
Returns:
|
|
DataFrame with columns ['date', 'predicted_traffic', 'yhat_lower', 'yhat_upper']
|
|
"""
|
|
if not self.is_trained:
|
|
raise ValueError("Model not trained. Call train() first.")
|
|
|
|
try:
|
|
# Create future dataframe
|
|
future = pd.DataFrame({'ds': future_dates})
|
|
|
|
# Add weather features if available
|
|
if weather_forecast is not None:
|
|
weather_forecast = weather_forecast.copy()
|
|
weather_forecast['date'] = pd.to_datetime(weather_forecast['date'])
|
|
future = future.merge(weather_forecast, left_on='ds', right_on='date', how='left')
|
|
|
|
# Fill missing weather with defaults
|
|
if 'temperature' in future.columns:
|
|
future['temperature'].fillna(15.0, inplace=True)
|
|
if 'precipitation' in future.columns:
|
|
future['precipitation'].fillna(0.0, inplace=True)
|
|
if 'is_rainy' in future.columns:
|
|
future['is_rainy'].fillna(0, inplace=True)
|
|
|
|
# Predict
|
|
forecast = self.model.predict(future)
|
|
|
|
# Format results
|
|
results = pd.DataFrame({
|
|
'date': forecast['ds'],
|
|
'predicted_traffic': forecast['yhat'].clip(lower=0), # Traffic can't be negative
|
|
'yhat_lower': forecast['yhat_lower'].clip(lower=0),
|
|
'yhat_upper': forecast['yhat_upper'].clip(lower=0)
|
|
})
|
|
|
|
logger.info("Traffic predictions generated",
|
|
dates=len(results),
|
|
avg_traffic=results['predicted_traffic'].mean())
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to predict traffic: {e}")
|
|
raise
|
|
|
|
def _calculate_metrics(self, actual: np.ndarray, predicted: np.ndarray) -> Dict[str, float]:
|
|
"""Calculate forecast accuracy metrics"""
|
|
mae = np.mean(np.abs(actual - predicted))
|
|
mse = np.mean((actual - predicted) ** 2)
|
|
rmse = np.sqrt(mse)
|
|
|
|
# MAPE (handle zeros)
|
|
mask = actual != 0
|
|
mape = np.mean(np.abs((actual[mask] - predicted[mask]) / actual[mask])) * 100 if mask.any() else 0
|
|
|
|
return {
|
|
'mae': float(mae),
|
|
'mse': float(mse),
|
|
'rmse': float(rmse),
|
|
'mape': float(mape)
|
|
}
|
|
|
|
def _get_spanish_holidays(self, start_year: int, end_year: int) -> pd.DataFrame:
|
|
"""Get Spanish holidays for the date range"""
|
|
try:
|
|
import holidays
|
|
|
|
es_holidays = holidays.Spain(years=range(start_year, end_year + 1))
|
|
|
|
holiday_dates = []
|
|
holiday_names = []
|
|
|
|
for date, name in es_holidays.items():
|
|
holiday_dates.append(date)
|
|
holiday_names.append(name)
|
|
|
|
return pd.DataFrame({
|
|
'ds': pd.to_datetime(holiday_dates),
|
|
'holiday': holiday_names
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not load Spanish holidays: {e}")
|
|
return pd.DataFrame(columns=['ds', 'holiday'])
|
|
|
|
|
|
class TrafficFeatureGenerator:
|
|
"""
|
|
Generate traffic-related features for demand forecasting.
|
|
Uses predicted traffic as a feature in product demand models.
|
|
"""
|
|
|
|
def __init__(self, traffic_forecaster: TrafficForecaster = None):
|
|
self.traffic_forecaster = traffic_forecaster or TrafficForecaster()
|
|
|
|
def generate_traffic_features(
|
|
self,
|
|
dates: pd.DatetimeIndex,
|
|
weather_forecast: pd.DataFrame = None
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Generate traffic features for given dates.
|
|
|
|
Args:
|
|
dates: Dates to generate features for
|
|
weather_forecast: Optional weather forecast
|
|
|
|
Returns:
|
|
DataFrame with traffic features
|
|
"""
|
|
if not self.traffic_forecaster.is_trained:
|
|
logger.warning("Traffic forecaster not trained, using default traffic values")
|
|
return pd.DataFrame({
|
|
'date': dates,
|
|
'predicted_traffic': 100.0, # Default baseline
|
|
'traffic_normalized': 1.0
|
|
})
|
|
|
|
# Predict traffic
|
|
traffic_predictions = self.traffic_forecaster.predict(dates, weather_forecast)
|
|
|
|
# Normalize traffic (0-2 range, 1 = average)
|
|
mean_traffic = traffic_predictions['predicted_traffic'].mean()
|
|
traffic_predictions['traffic_normalized'] = (
|
|
traffic_predictions['predicted_traffic'] / mean_traffic
|
|
).clip(0, 2)
|
|
|
|
# Add traffic categories
|
|
traffic_predictions['traffic_category'] = pd.cut(
|
|
traffic_predictions['predicted_traffic'],
|
|
bins=[0, 50, 100, 150, np.inf],
|
|
labels=['low', 'medium', 'high', 'very_high']
|
|
)
|
|
|
|
return traffic_predictions
|
|
|
|
def add_traffic_features_to_forecast_data(
|
|
self,
|
|
forecast_data: pd.DataFrame,
|
|
traffic_predictions: pd.DataFrame
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Add traffic features to forecast input data.
|
|
|
|
Args:
|
|
forecast_data: Existing forecast data with 'date' column
|
|
traffic_predictions: Traffic predictions from generate_traffic_features()
|
|
|
|
Returns:
|
|
Enhanced forecast data with traffic features
|
|
"""
|
|
forecast_data = forecast_data.copy()
|
|
forecast_data['date'] = pd.to_datetime(forecast_data['date'])
|
|
traffic_predictions['date'] = pd.to_datetime(traffic_predictions['date'])
|
|
|
|
# Merge traffic features
|
|
enhanced_data = forecast_data.merge(
|
|
traffic_predictions[['date', 'predicted_traffic', 'traffic_normalized']],
|
|
on='date',
|
|
how='left'
|
|
)
|
|
|
|
# Fill missing with defaults
|
|
enhanced_data['predicted_traffic'].fillna(100.0, inplace=True)
|
|
enhanced_data['traffic_normalized'].fillna(1.0, inplace=True)
|
|
|
|
return enhanced_data
|