Files
bakery-ia/services/training/app/ml/event_feature_generator.py
2025-11-05 13:34:56 +01:00

254 lines
8.3 KiB
Python

"""
Event Feature Generator
Converts calendar events into features for demand forecasting
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional
from datetime import date, timedelta
import structlog
logger = structlog.get_logger()
class EventFeatureGenerator:
"""
Generate event-related features for demand forecasting.
Features include:
- Binary flags for event presence
- Event impact multipliers
- Event type indicators
- Days until/since major events
"""
# Event type impact weights (default multipliers)
EVENT_IMPACT_WEIGHTS = {
'promotion': 1.3,
'festival': 1.8,
'holiday': 0.7, # Bakeries often close or have reduced demand
'weather_event': 0.8, # Bad weather reduces foot traffic
'school_break': 1.2,
'sport_event': 1.4,
'market': 1.5,
'concert': 1.3,
'local_event': 1.2
}
def __init__(self):
pass
def generate_event_features(
self,
dates: pd.DatetimeIndex,
events: List[Dict[str, Any]]
) -> pd.DataFrame:
"""
Generate event features for given dates.
Args:
dates: Dates to generate features for
events: List of event dictionaries with keys:
- event_date: date
- event_type: str
- impact_multiplier: float (optional)
- event_name: str
Returns:
DataFrame with event features
"""
df = pd.DataFrame({'date': dates})
# Initialize feature columns
df['has_event'] = 0
df['event_impact'] = 1.0 # Neutral impact
df['is_promotion'] = 0
df['is_festival'] = 0
df['is_local_event'] = 0
df['days_to_next_event'] = 365
df['days_since_last_event'] = 365
if not events:
logger.debug("No events provided, returning default features")
return df
# Convert events to DataFrame for easier processing
events_df = pd.DataFrame(events)
events_df['event_date'] = pd.to_datetime(events_df['event_date'])
for idx, row in df.iterrows():
current_date = pd.to_datetime(row['date'])
# Check if there's an event on this date
day_events = events_df[events_df['event_date'] == current_date]
if not day_events.empty:
df.at[idx, 'has_event'] = 1
# Use custom impact multiplier if provided, else use default
if 'impact_multiplier' in day_events.columns and not day_events['impact_multiplier'].isna().all():
impact = day_events['impact_multiplier'].max()
else:
# Use default impact based on event type
event_types = day_events['event_type'].tolist()
impacts = [self.EVENT_IMPACT_WEIGHTS.get(et, 1.0) for et in event_types]
impact = max(impacts)
df.at[idx, 'event_impact'] = impact
# Set event type flags
event_types = day_events['event_type'].tolist()
if 'promotion' in event_types:
df.at[idx, 'is_promotion'] = 1
if 'festival' in event_types:
df.at[idx, 'is_festival'] = 1
if 'local_event' in event_types or 'market' in event_types:
df.at[idx, 'is_local_event'] = 1
# Calculate days to/from nearest event
future_events = events_df[events_df['event_date'] > current_date]
if not future_events.empty:
next_event_date = future_events['event_date'].min()
df.at[idx, 'days_to_next_event'] = (next_event_date - current_date).days
past_events = events_df[events_df['event_date'] < current_date]
if not past_events.empty:
last_event_date = past_events['event_date'].max()
df.at[idx, 'days_since_last_event'] = (current_date - last_event_date).days
# Cap days values at 365
df['days_to_next_event'] = df['days_to_next_event'].clip(upper=365)
df['days_since_last_event'] = df['days_since_last_event'].clip(upper=365)
logger.debug("Generated event features",
total_days=len(df),
days_with_events=df['has_event'].sum())
return df
def add_event_features_to_forecast_data(
self,
forecast_data: pd.DataFrame,
event_features: pd.DataFrame
) -> pd.DataFrame:
"""
Add event features to forecast input data.
Args:
forecast_data: Existing forecast data with 'date' column
event_features: Event features from generate_event_features()
Returns:
Enhanced forecast data with event features
"""
forecast_data = forecast_data.copy()
forecast_data['date'] = pd.to_datetime(forecast_data['date'])
event_features['date'] = pd.to_datetime(event_features['date'])
# Merge event features
enhanced_data = forecast_data.merge(
event_features[[
'date', 'has_event', 'event_impact', 'is_promotion',
'is_festival', 'is_local_event', 'days_to_next_event',
'days_since_last_event'
]],
on='date',
how='left'
)
# Fill missing with defaults
enhanced_data['has_event'].fillna(0, inplace=True)
enhanced_data['event_impact'].fillna(1.0, inplace=True)
enhanced_data['is_promotion'].fillna(0, inplace=True)
enhanced_data['is_festival'].fillna(0, inplace=True)
enhanced_data['is_local_event'].fillna(0, inplace=True)
enhanced_data['days_to_next_event'].fillna(365, inplace=True)
enhanced_data['days_since_last_event'].fillna(365, inplace=True)
return enhanced_data
def get_event_summary(self, events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Get summary statistics about events.
Args:
events: List of event dictionaries
Returns:
Summary dict with counts by type, avg impact, etc.
"""
if not events:
return {
'total_events': 0,
'events_by_type': {},
'avg_impact': 1.0
}
events_df = pd.DataFrame(events)
summary = {
'total_events': len(events),
'events_by_type': events_df['event_type'].value_counts().to_dict(),
'date_range': {
'start': events_df['event_date'].min().isoformat() if not events_df.empty else None,
'end': events_df['event_date'].max().isoformat() if not events_df.empty else None
}
}
if 'impact_multiplier' in events_df.columns:
summary['avg_impact'] = float(events_df['impact_multiplier'].mean())
return summary
def create_event_calendar_features(
dates: pd.DatetimeIndex,
tenant_id: str,
event_repository = None
) -> pd.DataFrame:
"""
Convenience function to fetch events from database and generate features.
Args:
dates: Dates to generate features for
tenant_id: Tenant UUID
event_repository: EventRepository instance (optional)
Returns:
DataFrame with event features
"""
if event_repository is None:
logger.warning("No event repository provided, using empty events")
events = []
else:
# Fetch events from database
from datetime import date
start_date = dates.min().date()
end_date = dates.max().date()
try:
import asyncio
from uuid import UUID
loop = asyncio.get_event_loop()
events_objects = loop.run_until_complete(
event_repository.get_events_by_date_range(
tenant_id=UUID(tenant_id),
start_date=start_date,
end_date=end_date,
confirmed_only=False
)
)
# Convert to dict format
events = [event.to_dict() for event in events_objects]
except Exception as e:
logger.error(f"Failed to fetch events from database: {e}")
events = []
# Generate features
generator = EventFeatureGenerator()
return generator.generate_event_features(dates, events)