Add all the code for training service

This commit is contained in:
Urtzi Alfaro
2025-07-19 16:59:37 +02:00
parent 42097202d2
commit f3071c00bd
21 changed files with 7504 additions and 764 deletions

View File

@@ -0,0 +1,493 @@
# services/training/app/ml/data_processor.py
"""
Data Processor for Training Service
Handles data preparation and feature engineering for ML training
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import logging
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
logger = logging.getLogger(__name__)
class BakeryDataProcessor:
"""
Enhanced data processor for bakery forecasting training service.
Handles data cleaning, feature engineering, and preparation for ML models.
"""
def __init__(self):
self.scalers = {} # Store scalers for each feature
self.imputers = {} # Store imputers for missing value handling
async def prepare_training_data(self,
sales_data: pd.DataFrame,
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame,
product_name: str) -> pd.DataFrame:
"""
Prepare comprehensive training data for a specific product.
Args:
sales_data: Historical sales data for the product
weather_data: Weather data
traffic_data: Traffic data
product_name: Product name for logging
Returns:
DataFrame ready for Prophet training with 'ds' and 'y' columns plus features
"""
try:
logger.info(f"Preparing training data for product: {product_name}")
# Convert and validate sales data
sales_clean = await self._process_sales_data(sales_data, product_name)
# Aggregate to daily level
daily_sales = await self._aggregate_daily_sales(sales_clean)
# Add temporal features
daily_sales = self._add_temporal_features(daily_sales)
# Merge external data sources
daily_sales = self._merge_weather_features(daily_sales, weather_data)
daily_sales = self._merge_traffic_features(daily_sales, traffic_data)
# Engineer additional features
daily_sales = self._engineer_features(daily_sales)
# Handle missing values
daily_sales = self._handle_missing_values(daily_sales)
# Prepare for Prophet (rename columns and validate)
prophet_data = self._prepare_prophet_format(daily_sales)
logger.info(f"Prepared {len(prophet_data)} data points for {product_name}")
return prophet_data
except Exception as e:
logger.error(f"Error preparing training data for {product_name}: {str(e)}")
raise
async def prepare_prediction_features(self,
future_dates: pd.DatetimeIndex,
weather_forecast: pd.DataFrame = None,
traffic_forecast: pd.DataFrame = None) -> pd.DataFrame:
"""
Create features for future predictions.
Args:
future_dates: Future dates to predict
weather_forecast: Weather forecast data
traffic_forecast: Traffic forecast data
Returns:
DataFrame with features for prediction
"""
try:
# Create base future dataframe
future_df = pd.DataFrame({'ds': future_dates})
# Add temporal features
future_df = self._add_temporal_features(
future_df.rename(columns={'ds': 'date'})
).rename(columns={'date': 'ds'})
# Add weather features
if weather_forecast is not None and not weather_forecast.empty:
weather_features = weather_forecast.copy()
if 'date' in weather_features.columns:
weather_features = weather_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(weather_features, on='ds', how='left')
# Add traffic features
if traffic_forecast is not None and not traffic_forecast.empty:
traffic_features = traffic_forecast.copy()
if 'date' in traffic_features.columns:
traffic_features = traffic_features.rename(columns={'date': 'ds'})
future_df = future_df.merge(traffic_features, on='ds', how='left')
# Engineer additional features
future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'}))
future_df = future_df.rename(columns={'date': 'ds'})
# Handle missing values in future data
numeric_columns = future_df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if future_df[col].isna().any():
# Use reasonable defaults for Madrid
if col == 'temperature':
future_df[col] = future_df[col].fillna(15.0) # Default Madrid temp
elif col == 'precipitation':
future_df[col] = future_df[col].fillna(0.0) # Default no rain
elif col == 'humidity':
future_df[col] = future_df[col].fillna(60.0) # Default humidity
elif col == 'traffic_volume':
future_df[col] = future_df[col].fillna(100.0) # Default traffic
else:
future_df[col] = future_df[col].fillna(future_df[col].median())
return future_df
except Exception as e:
logger.error(f"Error creating prediction features: {e}")
# Return minimal features if error
return pd.DataFrame({'ds': future_dates})
async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame:
"""Process and clean sales data"""
sales_clean = sales_data.copy()
# Ensure date column exists and is datetime
if 'date' not in sales_clean.columns:
raise ValueError("Sales data must have a 'date' column")
sales_clean['date'] = pd.to_datetime(sales_clean['date'])
# Ensure quantity column exists and is numeric
if 'quantity' not in sales_clean.columns:
raise ValueError("Sales data must have a 'quantity' column")
sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce')
# Remove rows with invalid quantities
sales_clean = sales_clean.dropna(subset=['quantity'])
sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales
# Filter for the specific product if product_name column exists
if 'product_name' in sales_clean.columns:
sales_clean = sales_clean[sales_clean['product_name'] == product_name]
return sales_clean
async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame:
"""Aggregate sales to daily level"""
daily_sales = sales_data.groupby('date').agg({
'quantity': 'sum'
}).reset_index()
# Ensure we have data for all dates in the range
date_range = pd.date_range(
start=daily_sales['date'].min(),
end=daily_sales['date'].max(),
freq='D'
)
full_date_df = pd.DataFrame({'date': date_range})
daily_sales = full_date_df.merge(daily_sales, on='date', how='left')
daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales
return daily_sales
def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add temporal features like day of week, month, etc."""
df = df.copy()
# Ensure we have a date column
if 'date' not in df.columns:
raise ValueError("DataFrame must have a 'date' column")
df['date'] = pd.to_datetime(df['date'])
# Day of week (0=Monday, 6=Sunday)
df['day_of_week'] = df['date'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Month and season
df['month'] = df['date'].dt.month
df['season'] = df['month'].apply(self._get_season)
# Week of year
df['week_of_year'] = df['date'].dt.isocalendar().week
# Quarter
df['quarter'] = df['date'].dt.quarter
# Holiday indicators (basic Spanish holidays)
df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int)
# School calendar effects (approximate)
df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int)
return df
def _merge_weather_features(self,
daily_sales: pd.DataFrame,
weather_data: pd.DataFrame) -> pd.DataFrame:
"""Merge weather features with sales data"""
if weather_data.empty:
# Add default weather columns with neutral values
daily_sales['temperature'] = 15.0 # Mild temperature
daily_sales['precipitation'] = 0.0 # No rain
daily_sales['humidity'] = 60.0 # Moderate humidity
daily_sales['wind_speed'] = 5.0 # Light wind
return daily_sales
try:
weather_clean = weather_data.copy()
# Ensure weather data has date column
if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns:
weather_clean = weather_clean.rename(columns={'ds': 'date'})
weather_clean['date'] = pd.to_datetime(weather_clean['date'])
# Select relevant weather features
weather_features = ['date']
# Add available weather columns with default names
weather_mapping = {
'temperature': ['temperature', 'temp', 'temperatura'],
'precipitation': ['precipitation', 'rain', 'lluvia', 'precipitacion'],
'humidity': ['humidity', 'humedad'],
'wind_speed': ['wind_speed', 'viento', 'wind']
}
for standard_name, possible_names in weather_mapping.items():
for possible_name in possible_names:
if possible_name in weather_clean.columns:
weather_clean[standard_name] = weather_clean[possible_name]
weather_features.append(standard_name)
break
# Keep only the features we found
weather_clean = weather_clean[weather_features].copy()
# Merge with sales data
merged = daily_sales.merge(weather_clean, on='date', how='left')
# Fill missing weather values with reasonable defaults
if 'temperature' in merged.columns:
merged['temperature'] = merged['temperature'].fillna(15.0)
if 'precipitation' in merged.columns:
merged['precipitation'] = merged['precipitation'].fillna(0.0)
if 'humidity' in merged.columns:
merged['humidity'] = merged['humidity'].fillna(60.0)
if 'wind_speed' in merged.columns:
merged['wind_speed'] = merged['wind_speed'].fillna(5.0)
return merged
except Exception as e:
logger.warning(f"Error merging weather data: {e}")
# Add default weather columns if merge fails
daily_sales['temperature'] = 15.0
daily_sales['precipitation'] = 0.0
daily_sales['humidity'] = 60.0
daily_sales['wind_speed'] = 5.0
return daily_sales
def _merge_traffic_features(self,
daily_sales: pd.DataFrame,
traffic_data: pd.DataFrame) -> pd.DataFrame:
"""Merge traffic features with sales data"""
if traffic_data.empty:
# Add default traffic column
daily_sales['traffic_volume'] = 100.0 # Neutral traffic level
return daily_sales
try:
traffic_clean = traffic_data.copy()
# Ensure traffic data has date column
if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns:
traffic_clean = traffic_clean.rename(columns={'ds': 'date'})
traffic_clean['date'] = pd.to_datetime(traffic_clean['date'])
# Select relevant traffic features
traffic_features = ['date']
# Map traffic column names
traffic_mapping = {
'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad'],
'pedestrian_count': ['pedestrian_count', 'peatones'],
'occupancy_rate': ['occupancy_rate', 'ocupacion']
}
for standard_name, possible_names in traffic_mapping.items():
for possible_name in possible_names:
if possible_name in traffic_clean.columns:
traffic_clean[standard_name] = traffic_clean[possible_name]
traffic_features.append(standard_name)
break
# Keep only the features we found
traffic_clean = traffic_clean[traffic_features].copy()
# Merge with sales data
merged = daily_sales.merge(traffic_clean, on='date', how='left')
# Fill missing traffic values
if 'traffic_volume' in merged.columns:
merged['traffic_volume'] = merged['traffic_volume'].fillna(100.0)
if 'pedestrian_count' in merged.columns:
merged['pedestrian_count'] = merged['pedestrian_count'].fillna(50.0)
if 'occupancy_rate' in merged.columns:
merged['occupancy_rate'] = merged['occupancy_rate'].fillna(0.5)
return merged
except Exception as e:
logger.warning(f"Error merging traffic data: {e}")
# Add default traffic column if merge fails
daily_sales['traffic_volume'] = 100.0
return daily_sales
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Engineer additional features from existing data"""
df = df.copy()
# Weather-based features
if 'temperature' in df.columns:
df['temp_squared'] = df['temperature'] ** 2
df['is_hot_day'] = (df['temperature'] > 25).astype(int)
df['is_cold_day'] = (df['temperature'] < 10).astype(int)
if 'precipitation' in df.columns:
df['is_rainy_day'] = (df['precipitation'] > 0).astype(int)
df['heavy_rain'] = (df['precipitation'] > 10).astype(int)
# Traffic-based features
if 'traffic_volume' in df.columns:
df['high_traffic'] = (df['traffic_volume'] > df['traffic_volume'].quantile(0.75)).astype(int)
df['low_traffic'] = (df['traffic_volume'] < df['traffic_volume'].quantile(0.25)).astype(int)
# Interaction features
if 'is_weekend' in df.columns and 'temperature' in df.columns:
df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature']
if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns:
df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume']
return df
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset"""
df = df.copy()
# For numeric columns, use median imputation
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if col != 'quantity' and df[col].isna().any():
median_value = df[col].median()
df[col] = df[col].fillna(median_value)
return df
def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare data in Prophet format with 'ds' and 'y' columns"""
prophet_df = df.copy()
# Rename columns for Prophet
if 'date' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'date': 'ds'})
if 'quantity' in prophet_df.columns:
prophet_df = prophet_df.rename(columns={'quantity': 'y'})
# Ensure ds is datetime
if 'ds' in prophet_df.columns:
prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
# Validate required columns
if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns:
raise ValueError("Prophet data must have 'ds' and 'y' columns")
# Remove any rows with missing target values
prophet_df = prophet_df.dropna(subset=['y'])
# Sort by date
prophet_df = prophet_df.sort_values('ds').reset_index(drop=True)
return prophet_df
def _get_season(self, month: int) -> int:
"""Get season from month (1-4 for Winter, Spring, Summer, Autumn)"""
if month in [12, 1, 2]:
return 1 # Winter
elif month in [3, 4, 5]:
return 2 # Spring
elif month in [6, 7, 8]:
return 3 # Summer
else:
return 4 # Autumn
def _is_spanish_holiday(self, date: datetime) -> bool:
"""Check if a date is a major Spanish holiday"""
month_day = (date.month, date.day)
# Major Spanish holidays that affect bakery sales
spanish_holidays = [
(1, 1), # New Year
(1, 6), # Epiphany
(5, 1), # Labour Day
(8, 15), # Assumption
(10, 12), # National Day
(11, 1), # All Saints
(12, 6), # Constitution
(12, 8), # Immaculate Conception
(12, 25), # Christmas
(5, 15), # San Isidro (Madrid)
(5, 2), # Madrid Community Day
]
return month_day in spanish_holidays
def _is_school_holiday(self, date: datetime) -> bool:
"""Check if a date is during school holidays (approximate)"""
month = date.month
# Approximate Spanish school holiday periods
# Summer holidays (July-August)
if month in [7, 8]:
return True
# Christmas holidays (mid December to early January)
if month == 12 and date.day >= 20:
return True
if month == 1 and date.day <= 10:
return True
# Easter holidays (approximate - first two weeks of April)
if month == 4 and date.day <= 14:
return True
return False
def calculate_feature_importance(self,
model_data: pd.DataFrame,
target_column: str = 'y') -> Dict[str, float]:
"""
Calculate feature importance for the model.
"""
try:
# Simple correlation-based importance
numeric_features = model_data.select_dtypes(include=[np.number]).columns
numeric_features = [col for col in numeric_features if col != target_column]
importance_scores = {}
for feature in numeric_features:
if feature in model_data.columns:
correlation = model_data[feature].corr(model_data[target_column])
importance_scores[feature] = abs(correlation) if not pd.isna(correlation) else 0.0
# Sort by importance
importance_scores = dict(sorted(importance_scores.items(),
key=lambda x: x[1], reverse=True))
return importance_scores
except Exception as e:
logger.error(f"Error calculating feature importance: {e}")
return {}

View File

@@ -0,0 +1,408 @@
# services/training/app/ml/prophet_manager.py
"""
Enhanced Prophet Manager for Training Service
Migrated from the monolithic backend to microservices architecture
"""
from typing import Dict, List, Any, Optional, Tuple
import pandas as pd
import numpy as np
from prophet import Prophet
import pickle
import logging
from datetime import datetime, timedelta
import uuid
import asyncio
import os
import joblib
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import json
from pathlib import Path
from app.core.config import settings
logger = logging.getLogger(__name__)
class BakeryProphetManager:
"""
Enhanced Prophet model manager for the training service.
Handles training, validation, and model persistence for bakery forecasting.
"""
def __init__(self):
self.models = {} # In-memory model storage
self.model_metadata = {} # Store model metadata
self.feature_scalers = {} # Store feature scalers per model
# Ensure model storage directory exists
os.makedirs(settings.MODEL_STORAGE_PATH, exist_ok=True)
async def train_bakery_model(self,
tenant_id: str,
product_name: str,
df: pd.DataFrame,
job_id: str) -> Dict[str, Any]:
"""
Train a Prophet model for bakery forecasting with enhanced features.
Args:
tenant_id: Tenant identifier
product_name: Product name
df: Training data with 'ds' and 'y' columns plus regressors
job_id: Training job identifier
Returns:
Dictionary with model information and metrics
"""
try:
logger.info(f"Training bakery model for tenant {tenant_id}, product {product_name}")
# Validate input data
await self._validate_training_data(df, product_name)
# Prepare data for Prophet
prophet_data = await self._prepare_prophet_data(df)
# Get regressor columns
regressor_columns = self._extract_regressor_columns(prophet_data)
# Initialize Prophet model with bakery-specific settings
model = self._create_prophet_model(regressor_columns)
# Add regressors to model
for regressor in regressor_columns:
if regressor in prophet_data.columns:
model.add_regressor(regressor)
# Fit the model
model.fit(prophet_data)
# Generate model ID and store model
model_id = f"{job_id}_{product_name}_{uuid.uuid4().hex[:8]}"
model_path = await self._store_model(
tenant_id, product_name, model, model_id, prophet_data, regressor_columns
)
# Calculate training metrics
training_metrics = await self._calculate_training_metrics(model, prophet_data)
# Prepare model information
model_info = {
"model_id": model_id,
"model_path": model_path,
"type": "prophet",
"training_samples": len(prophet_data),
"features": regressor_columns,
"hyperparameters": {
"seasonality_mode": settings.PROPHET_SEASONALITY_MODE,
"daily_seasonality": settings.PROPHET_DAILY_SEASONALITY,
"weekly_seasonality": settings.PROPHET_WEEKLY_SEASONALITY,
"yearly_seasonality": settings.PROPHET_YEARLY_SEASONALITY
},
"training_metrics": training_metrics,
"trained_at": datetime.now().isoformat(),
"data_period": {
"start_date": prophet_data['ds'].min().isoformat(),
"end_date": prophet_data['ds'].max().isoformat(),
"total_days": len(prophet_data)
}
}
logger.info(f"Model trained successfully for {product_name}")
return model_info
except Exception as e:
logger.error(f"Failed to train bakery model for {product_name}: {str(e)}")
raise
async def generate_forecast(self,
model_path: str,
future_dates: pd.DataFrame,
regressor_columns: List[str]) -> pd.DataFrame:
"""
Generate forecast using a stored Prophet model.
Args:
model_path: Path to the stored model
future_dates: DataFrame with future dates and regressors
regressor_columns: List of regressor column names
Returns:
DataFrame with forecast results
"""
try:
# Load the model
model = joblib.load(model_path)
# Validate future data has required regressors
for regressor in regressor_columns:
if regressor not in future_dates.columns:
logger.warning(f"Missing regressor {regressor}, filling with median")
future_dates[regressor] = 0 # Default value
# Generate forecast
forecast = model.predict(future_dates)
return forecast
except Exception as e:
logger.error(f"Failed to generate forecast: {str(e)}")
raise
async def _validate_training_data(self, df: pd.DataFrame, product_name: str):
"""Validate training data quality"""
if df.empty:
raise ValueError(f"No training data available for {product_name}")
if len(df) < settings.MIN_TRAINING_DATA_DAYS:
raise ValueError(
f"Insufficient training data for {product_name}: "
f"{len(df)} days, minimum required: {settings.MIN_TRAINING_DATA_DAYS}"
)
required_columns = ['ds', 'y']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Check for valid date range
if df['ds'].isna().any():
raise ValueError("Invalid dates found in training data")
# Check for valid target values
if df['y'].isna().all():
raise ValueError("No valid target values found")
async def _prepare_prophet_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare data for Prophet training"""
prophet_data = df.copy()
# Ensure ds column is datetime
prophet_data['ds'] = pd.to_datetime(prophet_data['ds'])
# Handle missing values in target
if prophet_data['y'].isna().any():
logger.warning("Filling missing target values with interpolation")
prophet_data['y'] = prophet_data['y'].interpolate(method='linear')
# Remove extreme outliers (values > 3 standard deviations)
mean_val = prophet_data['y'].mean()
std_val = prophet_data['y'].std()
if std_val > 0: # Avoid division by zero
lower_bound = mean_val - 3 * std_val
upper_bound = mean_val + 3 * std_val
before_count = len(prophet_data)
prophet_data = prophet_data[
(prophet_data['y'] >= lower_bound) &
(prophet_data['y'] <= upper_bound)
]
after_count = len(prophet_data)
if before_count != after_count:
logger.info(f"Removed {before_count - after_count} outliers")
# Ensure chronological order
prophet_data = prophet_data.sort_values('ds').reset_index(drop=True)
# Fill missing values in regressors
numeric_columns = prophet_data.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if col != 'y' and prophet_data[col].isna().any():
prophet_data[col] = prophet_data[col].fillna(prophet_data[col].median())
return prophet_data
def _extract_regressor_columns(self, df: pd.DataFrame) -> List[str]:
"""Extract regressor columns from the dataframe"""
excluded_columns = ['ds', 'y']
regressor_columns = []
for col in df.columns:
if col not in excluded_columns and df[col].dtype in ['int64', 'float64']:
regressor_columns.append(col)
logger.info(f"Identified regressor columns: {regressor_columns}")
return regressor_columns
def _create_prophet_model(self, regressor_columns: List[str]) -> Prophet:
"""Create Prophet model with bakery-specific settings"""
# Get Spanish holidays
holidays = self._get_spanish_holidays()
# Bakery-specific Prophet configuration
model = Prophet(
holidays=holidays if not holidays.empty else None,
daily_seasonality=settings.PROPHET_DAILY_SEASONALITY,
weekly_seasonality=settings.PROPHET_WEEKLY_SEASONALITY,
yearly_seasonality=settings.PROPHET_YEARLY_SEASONALITY,
seasonality_mode=settings.PROPHET_SEASONALITY_MODE,
changepoint_prior_scale=0.05, # Conservative changepoint detection
seasonality_prior_scale=10, # Strong seasonality for bakeries
holidays_prior_scale=10, # Strong holiday effects
interval_width=0.8, # 80% confidence intervals
mcmc_samples=0, # Use MAP estimation (faster)
uncertainty_samples=1000 # For uncertainty estimation
)
return model
def _get_spanish_holidays(self) -> pd.DataFrame:
"""Get Spanish holidays for Prophet model"""
try:
# Define major Spanish holidays that affect bakery sales
holidays_list = []
years = range(2020, 2030) # Cover training and prediction period
for year in years:
holidays_list.extend([
{'holiday': 'new_year', 'ds': f'{year}-01-01'},
{'holiday': 'epiphany', 'ds': f'{year}-01-06'},
{'holiday': 'may_day', 'ds': f'{year}-05-01'},
{'holiday': 'assumption', 'ds': f'{year}-08-15'},
{'holiday': 'national_day', 'ds': f'{year}-10-12'},
{'holiday': 'all_saints', 'ds': f'{year}-11-01'},
{'holiday': 'constitution', 'ds': f'{year}-12-06'},
{'holiday': 'immaculate', 'ds': f'{year}-12-08'},
{'holiday': 'christmas', 'ds': f'{year}-12-25'},
# Madrid specific holidays
{'holiday': 'madrid_patron', 'ds': f'{year}-05-15'}, # San Isidro
{'holiday': 'madrid_community', 'ds': f'{year}-05-02'},
])
holidays_df = pd.DataFrame(holidays_list)
holidays_df['ds'] = pd.to_datetime(holidays_df['ds'])
return holidays_df
except Exception as e:
logger.warning(f"Error creating holidays dataframe: {e}")
return pd.DataFrame()
async def _store_model(self,
tenant_id: str,
product_name: str,
model: Prophet,
model_id: str,
training_data: pd.DataFrame,
regressor_columns: List[str]) -> str:
"""Store model and metadata to filesystem"""
# Create model filename
model_filename = f"{model_id}_prophet_model.pkl"
model_path = os.path.join(settings.MODEL_STORAGE_PATH, model_filename)
# Store the model
joblib.dump(model, model_path)
# Store metadata
metadata = {
"tenant_id": tenant_id,
"product_name": product_name,
"model_id": model_id,
"regressor_columns": regressor_columns,
"training_samples": len(training_data),
"training_period": {
"start": training_data['ds'].min().isoformat(),
"end": training_data['ds'].max().isoformat()
},
"created_at": datetime.now().isoformat(),
"model_type": "prophet",
"file_path": model_path
}
metadata_path = model_path.replace('.pkl', '_metadata.json')
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
# Store in memory for quick access
model_key = f"{tenant_id}:{product_name}"
self.models[model_key] = model
self.model_metadata[model_key] = metadata
logger.info(f"Model stored at: {model_path}")
return model_path
async def _calculate_training_metrics(self,
model: Prophet,
training_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate training metrics for the model"""
try:
# Generate in-sample predictions
forecast = model.predict(training_data[['ds'] + [col for col in training_data.columns if col not in ['ds', 'y']]])
# Calculate metrics
y_true = training_data['y'].values
y_pred = forecast['yhat'].values
# Basic metrics
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
# MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
# R-squared
r2 = r2_score(y_true, y_pred)
return {
"mae": round(mae, 2),
"mse": round(mse, 2),
"rmse": round(rmse, 2),
"mape": round(mape, 2),
"r2_score": round(r2, 4),
"mean_actual": round(np.mean(y_true), 2),
"mean_predicted": round(np.mean(y_pred), 2)
}
except Exception as e:
logger.error(f"Error calculating training metrics: {e}")
return {
"mae": 0.0,
"mse": 0.0,
"rmse": 0.0,
"mape": 0.0,
"r2_score": 0.0,
"mean_actual": 0.0,
"mean_predicted": 0.0
}
def get_model_info(self, tenant_id: str, product_name: str) -> Optional[Dict[str, Any]]:
"""Get model information for a specific tenant and product"""
model_key = f"{tenant_id}:{product_name}"
return self.model_metadata.get(model_key)
def list_models(self, tenant_id: str) -> List[Dict[str, Any]]:
"""List all models for a tenant"""
tenant_models = []
for model_key, metadata in self.model_metadata.items():
if metadata['tenant_id'] == tenant_id:
tenant_models.append(metadata)
return tenant_models
async def cleanup_old_models(self, days_old: int = 30):
"""Clean up old model files"""
try:
cutoff_date = datetime.now() - timedelta(days=days_old)
for model_path in Path(settings.MODEL_STORAGE_PATH).glob("*.pkl"):
# Check file modification time
if model_path.stat().st_mtime < cutoff_date.timestamp():
# Remove model and metadata files
model_path.unlink()
metadata_path = model_path.with_suffix('.json')
if metadata_path.exists():
metadata_path.unlink()
logger.info(f"Cleaned up old model: {model_path}")
except Exception as e:
logger.error(f"Error during model cleanup: {e}")

View File

@@ -1,174 +1,372 @@
# services/training/app/ml/trainer.py
"""
ML Training implementation
ML Trainer for Training Service
Orchestrates the complete training process
"""
import asyncio
import structlog
from typing import Dict, Any, List
from typing import Dict, List, Any, Optional, Tuple
import pandas as pd
from datetime import datetime
import joblib
import os
from prophet import Prophet
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from datetime import datetime, timedelta
import logging
import asyncio
import uuid
from pathlib import Path
from app.ml.prophet_manager import BakeryProphetManager
from app.ml.data_processor import BakeryDataProcessor
from app.core.config import settings
logger = structlog.get_logger()
logger = logging.getLogger(__name__)
class MLTrainer:
"""ML training implementation"""
class BakeryMLTrainer:
"""
Main ML trainer that orchestrates the complete training process.
Replaces the old Celery-based training system with clean async implementation.
"""
def __init__(self):
self.model_storage_path = settings.MODEL_STORAGE_PATH
os.makedirs(self.model_storage_path, exist_ok=True)
self.prophet_manager = BakeryProphetManager()
self.data_processor = BakeryDataProcessor()
async def train_tenant_models(self,
tenant_id: str,
sales_data: List[Dict],
weather_data: List[Dict] = None,
traffic_data: List[Dict] = None,
job_id: str = None) -> Dict[str, Any]:
"""
Train models for all products of a tenant.
Args:
tenant_id: Tenant identifier
sales_data: Historical sales data
weather_data: Weather data (optional)
traffic_data: Traffic data (optional)
job_id: Training job identifier
Returns:
Dictionary with training results for each product
"""
if not job_id:
job_id = f"training_{tenant_id}_{uuid.uuid4().hex[:8]}"
logger.info(f"Starting training job {job_id} for tenant {tenant_id}")
try:
# Convert input data to DataFrames
sales_df = pd.DataFrame(sales_data) if sales_data else pd.DataFrame()
weather_df = pd.DataFrame(weather_data) if weather_data else pd.DataFrame()
traffic_df = pd.DataFrame(traffic_data) if traffic_data else pd.DataFrame()
# Validate input data
await self._validate_input_data(sales_df, tenant_id)
# Get unique products
products = sales_df['product_name'].unique().tolist()
logger.info(f"Training models for {len(products)} products: {products}")
# Process data for each product
processed_data = await self._process_all_products(
sales_df, weather_df, traffic_df, products
)
# Train models for each product
training_results = await self._train_all_models(
tenant_id, processed_data, job_id
)
# Calculate overall training summary
summary = self._calculate_training_summary(training_results)
result = {
"job_id": job_id,
"tenant_id": tenant_id,
"status": "completed",
"products_trained": len([r for r in training_results.values() if r.get('status') == 'success']),
"products_failed": len([r for r in training_results.values() if r.get('status') == 'error']),
"total_products": len(products),
"training_results": training_results,
"summary": summary,
"completed_at": datetime.now().isoformat()
}
logger.info(f"Training job {job_id} completed successfully")
return result
except Exception as e:
logger.error(f"Training job {job_id} failed: {str(e)}")
raise
async def train_models(self, training_data: Dict[str, Any], job_id: str, db) -> Dict[str, Any]:
"""Train models for all products"""
async def train_single_product(self,
tenant_id: str,
product_name: str,
sales_data: List[Dict],
weather_data: List[Dict] = None,
traffic_data: List[Dict] = None,
job_id: str = None) -> Dict[str, Any]:
"""
Train model for a single product.
models_result = {}
Args:
tenant_id: Tenant identifier
product_name: Product name
sales_data: Historical sales data
weather_data: Weather data (optional)
traffic_data: Traffic data (optional)
job_id: Training job identifier
Returns:
Training result for the product
"""
if not job_id:
job_id = f"training_{tenant_id}_{product_name}_{uuid.uuid4().hex[:8]}"
logger.info(f"Starting single product training {job_id} for {product_name}")
# Get sales data
sales_data = training_data.get("sales_data", [])
external_data = training_data.get("external_data", {})
try:
# Convert input data to DataFrames
sales_df = pd.DataFrame(sales_data) if sales_data else pd.DataFrame()
weather_df = pd.DataFrame(weather_data) if weather_data else pd.DataFrame()
traffic_df = pd.DataFrame(traffic_data) if traffic_data else pd.DataFrame()
# Filter sales data for the specific product
product_sales = sales_df[sales_df['product_name'] == product_name].copy()
# Validate product data
if product_sales.empty:
raise ValueError(f"No sales data found for product: {product_name}")
# Prepare training data
processed_data = await self.data_processor.prepare_training_data(
sales_data=product_sales,
weather_data=weather_df,
traffic_data=traffic_df,
product_name=product_name
)
# Train the model
model_info = await self.prophet_manager.train_bakery_model(
tenant_id=tenant_id,
product_name=product_name,
df=processed_data,
job_id=job_id
)
result = {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"status": "success",
"model_info": model_info,
"data_points": len(processed_data),
"completed_at": datetime.now().isoformat()
}
logger.info(f"Single product training {job_id} completed successfully")
return result
except Exception as e:
logger.error(f"Single product training {job_id} failed: {str(e)}")
raise
async def evaluate_model_performance(self,
tenant_id: str,
product_name: str,
model_path: str,
test_data: List[Dict]) -> Dict[str, Any]:
"""
Evaluate model performance on test data.
# Group by product
products_data = self._group_by_product(sales_data)
Args:
tenant_id: Tenant identifier
product_name: Product name
model_path: Path to the trained model
test_data: Test data for evaluation
Returns:
Performance metrics
"""
try:
logger.info(f"Evaluating model performance for {product_name}")
# Convert test data to DataFrame
test_df = pd.DataFrame(test_data)
# Prepare test data
test_prepared = await self.data_processor.prepare_prediction_features(
future_dates=test_df['ds'],
weather_forecast=test_df if 'temperature' in test_df.columns else pd.DataFrame(),
traffic_forecast=test_df if 'traffic_volume' in test_df.columns else pd.DataFrame()
)
# Get regressor columns
regressor_columns = [col for col in test_prepared.columns if col not in ['ds', 'y']]
# Generate predictions
forecast = await self.prophet_manager.generate_forecast(
model_path=model_path,
future_dates=test_prepared,
regressor_columns=regressor_columns
)
# Calculate performance metrics if we have actual values
metrics = {}
if 'y' in test_df.columns:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
y_true = test_df['y'].values
y_pred = forecast['yhat'].values
metrics = {
"mae": float(mean_absolute_error(y_true, y_pred)),
"rmse": float(np.sqrt(mean_squared_error(y_true, y_pred))),
"mape": float(np.mean(np.abs((y_true - y_pred) / y_true)) * 100),
"r2_score": float(r2_score(y_true, y_pred))
}
result = {
"tenant_id": tenant_id,
"product_name": product_name,
"evaluation_metrics": metrics,
"forecast_samples": len(forecast),
"evaluated_at": datetime.now().isoformat()
}
return result
except Exception as e:
logger.error(f"Model evaluation failed: {str(e)}")
raise
async def _validate_input_data(self, sales_df: pd.DataFrame, tenant_id: str):
"""Validate input sales data"""
if sales_df.empty:
raise ValueError(f"No sales data provided for tenant {tenant_id}")
# Train model for each product
for product_name, product_sales in products_data.items():
required_columns = ['date', 'product_name', 'quantity']
missing_columns = [col for col in required_columns if col not in sales_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Check for valid dates
try:
sales_df['date'] = pd.to_datetime(sales_df['date'])
except Exception:
raise ValueError("Invalid date format in sales data")
# Check for valid quantities
if not sales_df['quantity'].dtype in ['int64', 'float64']:
raise ValueError("Quantity column must be numeric")
async def _process_all_products(self,
sales_df: pd.DataFrame,
weather_df: pd.DataFrame,
traffic_df: pd.DataFrame,
products: List[str]) -> Dict[str, pd.DataFrame]:
"""Process data for all products"""
processed_data = {}
for product_name in products:
try:
model_result = await self._train_product_model(
product_name,
product_sales,
external_data,
job_id
logger.info(f"Processing data for product: {product_name}")
# Filter sales data for this product
product_sales = sales_df[sales_df['product_name'] == product_name].copy()
# Process the product data
processed_product_data = await self.data_processor.prepare_training_data(
sales_data=product_sales,
weather_data=weather_df,
traffic_data=traffic_df,
product_name=product_name
)
models_result[product_name] = model_result
processed_data[product_name] = processed_product_data
logger.info(f"Processed {len(processed_product_data)} data points for {product_name}")
except Exception as e:
logger.error(f"Failed to train model for {product_name}: {e}")
logger.error(f"Failed to process data for {product_name}: {str(e)}")
# Continue with other products
continue
return models_result
return processed_data
def _group_by_product(self, sales_data: List[Dict]) -> Dict[str, List[Dict]]:
"""Group sales data by product"""
async def _train_all_models(self,
tenant_id: str,
processed_data: Dict[str, pd.DataFrame],
job_id: str) -> Dict[str, Any]:
"""Train models for all processed products"""
training_results = {}
products = {}
for sale in sales_data:
product_name = sale.get("product_name")
if product_name not in products:
products[product_name] = []
products[product_name].append(sale)
return products
async def _train_product_model(self, product_name: str, sales_data: List[Dict], external_data: Dict, job_id: str) -> Dict[str, Any]:
"""Train Prophet model for a single product"""
# Convert to DataFrame
df = pd.DataFrame(sales_data)
df['date'] = pd.to_datetime(df['date'])
# Aggregate daily sales
daily_sales = df.groupby('date')['quantity_sold'].sum().reset_index()
daily_sales.columns = ['ds', 'y']
# Add external features
daily_sales = self._add_external_features(daily_sales, external_data)
# Train Prophet model
model = Prophet(
seasonality_mode=settings.PROPHET_SEASONALITY_MODE,
daily_seasonality=settings.PROPHET_DAILY_SEASONALITY,
weekly_seasonality=settings.PROPHET_WEEKLY_SEASONALITY,
yearly_seasonality=settings.PROPHET_YEARLY_SEASONALITY
)
# Add regressors
model.add_regressor('temperature')
model.add_regressor('humidity')
model.add_regressor('precipitation')
model.add_regressor('traffic_volume')
# Fit model
model.fit(daily_sales)
# Save model
model_path = os.path.join(
self.model_storage_path,
f"{job_id}_{product_name}_prophet_model.pkl"
)
joblib.dump(model, model_path)
return {
"type": "prophet",
"path": model_path,
"training_samples": len(daily_sales),
"features": ["temperature", "humidity", "precipitation", "traffic_volume"],
"hyperparameters": {
"seasonality_mode": settings.PROPHET_SEASONALITY_MODE,
"daily_seasonality": settings.PROPHET_DAILY_SEASONALITY,
"weekly_seasonality": settings.PROPHET_WEEKLY_SEASONALITY,
"yearly_seasonality": settings.PROPHET_YEARLY_SEASONALITY
}
}
def _add_external_features(self, daily_sales: pd.DataFrame, external_data: Dict) -> pd.DataFrame:
"""Add external features to sales data"""
# Add weather data
weather_data = external_data.get("weather", [])
if weather_data:
weather_df = pd.DataFrame(weather_data)
weather_df['ds'] = pd.to_datetime(weather_df['date'])
daily_sales = daily_sales.merge(weather_df[['ds', 'temperature', 'humidity', 'precipitation']], on='ds', how='left')
# Add traffic data
traffic_data = external_data.get("traffic", [])
if traffic_data:
traffic_df = pd.DataFrame(traffic_data)
traffic_df['ds'] = pd.to_datetime(traffic_df['date'])
daily_sales = daily_sales.merge(traffic_df[['ds', 'traffic_volume']], on='ds', how='left')
# Fill missing values
daily_sales['temperature'] = daily_sales['temperature'].fillna(daily_sales['temperature'].mean())
daily_sales['humidity'] = daily_sales['humidity'].fillna(daily_sales['humidity'].mean())
daily_sales['precipitation'] = daily_sales['precipitation'].fillna(0)
daily_sales['traffic_volume'] = daily_sales['traffic_volume'].fillna(daily_sales['traffic_volume'].mean())
return daily_sales
async def validate_models(self, models_result: Dict[str, Any], db) -> Dict[str, Any]:
"""Validate trained models"""
validation_results = {}
for product_name, model_data in models_result.items():
for product_name, product_data in processed_data.items():
try:
# Load model
model_path = model_data.get("path")
model = joblib.load(model_path)
logger.info(f"Training model for product: {product_name}")
# Mock validation for now (in production, you'd use actual validation data)
validation_results[product_name] = {
"mape": np.random.uniform(10, 25), # Mock MAPE between 10-25%
"rmse": np.random.uniform(8, 15), # Mock RMSE
"mae": np.random.uniform(5, 12), # Mock MAE
"r2_score": np.random.uniform(0.7, 0.9) # Mock R2 score
# Check if we have enough data
if len(product_data) < settings.MIN_TRAINING_DATA_DAYS:
training_results[product_name] = {
'status': 'skipped',
'reason': 'insufficient_data',
'data_points': len(product_data),
'min_required': settings.MIN_TRAINING_DATA_DAYS
}
continue
# Train the model
model_info = await self.prophet_manager.train_bakery_model(
tenant_id=tenant_id,
product_name=product_name,
df=product_data,
job_id=job_id
)
training_results[product_name] = {
'status': 'success',
'model_info': model_info,
'data_points': len(product_data),
'trained_at': datetime.now().isoformat()
}
logger.info(f"Successfully trained model for {product_name}")
except Exception as e:
logger.error(f"Validation failed for {product_name}: {e}")
validation_results[product_name] = {
"mape": None,
"rmse": None,
"mae": None,
"r2_score": None,
"error": str(e)
logger.error(f"Failed to train model for {product_name}: {str(e)}")
training_results[product_name] = {
'status': 'error',
'error_message': str(e),
'data_points': len(product_data) if product_data is not None else 0
}
return validation_results
return training_results
def _calculate_training_summary(self, training_results: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate summary statistics from training results"""
total_products = len(training_results)
successful_products = len([r for r in training_results.values() if r.get('status') == 'success'])
failed_products = len([r for r in training_results.values() if r.get('status') == 'error'])
skipped_products = len([r for r in training_results.values() if r.get('status') == 'skipped'])
# Calculate average training metrics for successful models
successful_results = [r for r in training_results.values() if r.get('status') == 'success']
avg_metrics = {}
if successful_results:
metrics_list = [r['model_info'].get('training_metrics', {}) for r in successful_results]
if metrics_list and all(metrics_list):
avg_metrics = {
'avg_mae': np.mean([m.get('mae', 0) for m in metrics_list]),
'avg_rmse': np.mean([m.get('rmse', 0) for m in metrics_list]),
'avg_mape': np.mean([m.get('mape', 0) for m in metrics_list]),
'avg_r2': np.mean([m.get('r2_score', 0) for m in metrics_list])
}
return {
'total_products': total_products,
'successful_products': successful_products,
'failed_products': failed_products,
'skipped_products': skipped_products,
'success_rate': round(successful_products / total_products * 100, 2) if total_products > 0 else 0,
'average_metrics': avg_metrics
}