254 lines
8.3 KiB
Python
254 lines
8.3 KiB
Python
|
|
"""
|
||
|
|
Event Feature Generator
|
||
|
|
Converts calendar events into features for demand forecasting
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
import numpy as np
|
||
|
|
from typing import List, Dict, Any, Optional
|
||
|
|
from datetime import date, timedelta
|
||
|
|
import structlog
|
||
|
|
|
||
|
|
logger = structlog.get_logger()
|
||
|
|
|
||
|
|
|
||
|
|
class EventFeatureGenerator:
|
||
|
|
"""
|
||
|
|
Generate event-related features for demand forecasting.
|
||
|
|
|
||
|
|
Features include:
|
||
|
|
- Binary flags for event presence
|
||
|
|
- Event impact multipliers
|
||
|
|
- Event type indicators
|
||
|
|
- Days until/since major events
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Event type impact weights (default multipliers)
|
||
|
|
EVENT_IMPACT_WEIGHTS = {
|
||
|
|
'promotion': 1.3,
|
||
|
|
'festival': 1.8,
|
||
|
|
'holiday': 0.7, # Bakeries often close or have reduced demand
|
||
|
|
'weather_event': 0.8, # Bad weather reduces foot traffic
|
||
|
|
'school_break': 1.2,
|
||
|
|
'sport_event': 1.4,
|
||
|
|
'market': 1.5,
|
||
|
|
'concert': 1.3,
|
||
|
|
'local_event': 1.2
|
||
|
|
}
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
pass
|
||
|
|
|
||
|
|
def generate_event_features(
|
||
|
|
self,
|
||
|
|
dates: pd.DatetimeIndex,
|
||
|
|
events: List[Dict[str, Any]]
|
||
|
|
) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Generate event features for given dates.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
dates: Dates to generate features for
|
||
|
|
events: List of event dictionaries with keys:
|
||
|
|
- event_date: date
|
||
|
|
- event_type: str
|
||
|
|
- impact_multiplier: float (optional)
|
||
|
|
- event_name: str
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
DataFrame with event features
|
||
|
|
"""
|
||
|
|
df = pd.DataFrame({'date': dates})
|
||
|
|
|
||
|
|
# Initialize feature columns
|
||
|
|
df['has_event'] = 0
|
||
|
|
df['event_impact'] = 1.0 # Neutral impact
|
||
|
|
df['is_promotion'] = 0
|
||
|
|
df['is_festival'] = 0
|
||
|
|
df['is_local_event'] = 0
|
||
|
|
df['days_to_next_event'] = 365
|
||
|
|
df['days_since_last_event'] = 365
|
||
|
|
|
||
|
|
if not events:
|
||
|
|
logger.debug("No events provided, returning default features")
|
||
|
|
return df
|
||
|
|
|
||
|
|
# Convert events to DataFrame for easier processing
|
||
|
|
events_df = pd.DataFrame(events)
|
||
|
|
events_df['event_date'] = pd.to_datetime(events_df['event_date'])
|
||
|
|
|
||
|
|
for idx, row in df.iterrows():
|
||
|
|
current_date = pd.to_datetime(row['date'])
|
||
|
|
|
||
|
|
# Check if there's an event on this date
|
||
|
|
day_events = events_df[events_df['event_date'] == current_date]
|
||
|
|
|
||
|
|
if not day_events.empty:
|
||
|
|
df.at[idx, 'has_event'] = 1
|
||
|
|
|
||
|
|
# Use custom impact multiplier if provided, else use default
|
||
|
|
if 'impact_multiplier' in day_events.columns and not day_events['impact_multiplier'].isna().all():
|
||
|
|
impact = day_events['impact_multiplier'].max()
|
||
|
|
else:
|
||
|
|
# Use default impact based on event type
|
||
|
|
event_types = day_events['event_type'].tolist()
|
||
|
|
impacts = [self.EVENT_IMPACT_WEIGHTS.get(et, 1.0) for et in event_types]
|
||
|
|
impact = max(impacts)
|
||
|
|
|
||
|
|
df.at[idx, 'event_impact'] = impact
|
||
|
|
|
||
|
|
# Set event type flags
|
||
|
|
event_types = day_events['event_type'].tolist()
|
||
|
|
if 'promotion' in event_types:
|
||
|
|
df.at[idx, 'is_promotion'] = 1
|
||
|
|
if 'festival' in event_types:
|
||
|
|
df.at[idx, 'is_festival'] = 1
|
||
|
|
if 'local_event' in event_types or 'market' in event_types:
|
||
|
|
df.at[idx, 'is_local_event'] = 1
|
||
|
|
|
||
|
|
# Calculate days to/from nearest event
|
||
|
|
future_events = events_df[events_df['event_date'] > current_date]
|
||
|
|
if not future_events.empty:
|
||
|
|
next_event_date = future_events['event_date'].min()
|
||
|
|
df.at[idx, 'days_to_next_event'] = (next_event_date - current_date).days
|
||
|
|
|
||
|
|
past_events = events_df[events_df['event_date'] < current_date]
|
||
|
|
if not past_events.empty:
|
||
|
|
last_event_date = past_events['event_date'].max()
|
||
|
|
df.at[idx, 'days_since_last_event'] = (current_date - last_event_date).days
|
||
|
|
|
||
|
|
# Cap days values at 365
|
||
|
|
df['days_to_next_event'] = df['days_to_next_event'].clip(upper=365)
|
||
|
|
df['days_since_last_event'] = df['days_since_last_event'].clip(upper=365)
|
||
|
|
|
||
|
|
logger.debug("Generated event features",
|
||
|
|
total_days=len(df),
|
||
|
|
days_with_events=df['has_event'].sum())
|
||
|
|
|
||
|
|
return df
|
||
|
|
|
||
|
|
def add_event_features_to_forecast_data(
|
||
|
|
self,
|
||
|
|
forecast_data: pd.DataFrame,
|
||
|
|
event_features: pd.DataFrame
|
||
|
|
) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Add event features to forecast input data.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
forecast_data: Existing forecast data with 'date' column
|
||
|
|
event_features: Event features from generate_event_features()
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Enhanced forecast data with event features
|
||
|
|
"""
|
||
|
|
forecast_data = forecast_data.copy()
|
||
|
|
forecast_data['date'] = pd.to_datetime(forecast_data['date'])
|
||
|
|
event_features['date'] = pd.to_datetime(event_features['date'])
|
||
|
|
|
||
|
|
# Merge event features
|
||
|
|
enhanced_data = forecast_data.merge(
|
||
|
|
event_features[[
|
||
|
|
'date', 'has_event', 'event_impact', 'is_promotion',
|
||
|
|
'is_festival', 'is_local_event', 'days_to_next_event',
|
||
|
|
'days_since_last_event'
|
||
|
|
]],
|
||
|
|
on='date',
|
||
|
|
how='left'
|
||
|
|
)
|
||
|
|
|
||
|
|
# Fill missing with defaults
|
||
|
|
enhanced_data['has_event'].fillna(0, inplace=True)
|
||
|
|
enhanced_data['event_impact'].fillna(1.0, inplace=True)
|
||
|
|
enhanced_data['is_promotion'].fillna(0, inplace=True)
|
||
|
|
enhanced_data['is_festival'].fillna(0, inplace=True)
|
||
|
|
enhanced_data['is_local_event'].fillna(0, inplace=True)
|
||
|
|
enhanced_data['days_to_next_event'].fillna(365, inplace=True)
|
||
|
|
enhanced_data['days_since_last_event'].fillna(365, inplace=True)
|
||
|
|
|
||
|
|
return enhanced_data
|
||
|
|
|
||
|
|
def get_event_summary(self, events: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
Get summary statistics about events.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
events: List of event dictionaries
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Summary dict with counts by type, avg impact, etc.
|
||
|
|
"""
|
||
|
|
if not events:
|
||
|
|
return {
|
||
|
|
'total_events': 0,
|
||
|
|
'events_by_type': {},
|
||
|
|
'avg_impact': 1.0
|
||
|
|
}
|
||
|
|
|
||
|
|
events_df = pd.DataFrame(events)
|
||
|
|
|
||
|
|
summary = {
|
||
|
|
'total_events': len(events),
|
||
|
|
'events_by_type': events_df['event_type'].value_counts().to_dict(),
|
||
|
|
'date_range': {
|
||
|
|
'start': events_df['event_date'].min().isoformat() if not events_df.empty else None,
|
||
|
|
'end': events_df['event_date'].max().isoformat() if not events_df.empty else None
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if 'impact_multiplier' in events_df.columns:
|
||
|
|
summary['avg_impact'] = float(events_df['impact_multiplier'].mean())
|
||
|
|
|
||
|
|
return summary
|
||
|
|
|
||
|
|
|
||
|
|
def create_event_calendar_features(
|
||
|
|
dates: pd.DatetimeIndex,
|
||
|
|
tenant_id: str,
|
||
|
|
event_repository = None
|
||
|
|
) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Convenience function to fetch events from database and generate features.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
dates: Dates to generate features for
|
||
|
|
tenant_id: Tenant UUID
|
||
|
|
event_repository: EventRepository instance (optional)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
DataFrame with event features
|
||
|
|
"""
|
||
|
|
if event_repository is None:
|
||
|
|
logger.warning("No event repository provided, using empty events")
|
||
|
|
events = []
|
||
|
|
else:
|
||
|
|
# Fetch events from database
|
||
|
|
from datetime import date
|
||
|
|
start_date = dates.min().date()
|
||
|
|
end_date = dates.max().date()
|
||
|
|
|
||
|
|
try:
|
||
|
|
import asyncio
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
loop = asyncio.get_event_loop()
|
||
|
|
events_objects = loop.run_until_complete(
|
||
|
|
event_repository.get_events_by_date_range(
|
||
|
|
tenant_id=UUID(tenant_id),
|
||
|
|
start_date=start_date,
|
||
|
|
end_date=end_date,
|
||
|
|
confirmed_only=False
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Convert to dict format
|
||
|
|
events = [event.to_dict() for event in events_objects]
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to fetch events from database: {e}")
|
||
|
|
events = []
|
||
|
|
|
||
|
|
# Generate features
|
||
|
|
generator = EventFeatureGenerator()
|
||
|
|
return generator.generate_event_features(dates, events)
|