# services/training/app/ml/data_processor.py """ Enhanced Data Processor for Training Service Handles data preparation, date alignment, cleaning, and feature engineering for ML training """ import pandas as pd import numpy as np from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta, timezone import logging from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType logger = logging.getLogger(__name__) class BakeryDataProcessor: """ Enhanced data processor for bakery forecasting training service. Integrates date alignment, data cleaning, feature engineering, and preparation for ML models. """ def __init__(self): self.scalers = {} # Store scalers for each feature self.imputers = {} # Store imputers for missing value handling self.date_alignment_service = DateAlignmentService() def _ensure_timezone_aware(self, df: pd.DataFrame, date_column: str = 'date') -> pd.DataFrame: """Ensure date column is timezone-aware to prevent conversion errors""" if date_column in df.columns: # Convert to datetime if not already df[date_column] = pd.to_datetime(df[date_column]) # If timezone-naive, localize to UTC if df[date_column].dt.tz is None: df[date_column] = df[date_column].dt.tz_localize('UTC') # If already timezone-aware but not UTC, convert to UTC elif str(df[date_column].dt.tz) != 'UTC': df[date_column] = df[date_column].dt.tz_convert('UTC') return df async def prepare_training_data(self, sales_data: pd.DataFrame, weather_data: pd.DataFrame, traffic_data: pd.DataFrame, product_name: str) -> pd.DataFrame: """ Prepare comprehensive training data for a specific product with date alignment. Args: sales_data: Historical sales data for the product weather_data: Weather data traffic_data: Traffic data product_name: Product name for logging Returns: DataFrame ready for Prophet training with 'ds' and 'y' columns plus features """ try: logger.info(f"Preparing training data for product: {product_name}") # Step 1: Convert and validate sales data sales_clean = await self._process_sales_data(sales_data, product_name) # FIX: Ensure timezone awareness before any operations sales_clean = self._ensure_timezone_aware(sales_clean) weather_data = self._ensure_timezone_aware(weather_data) if not weather_data.empty else weather_data traffic_data = self._ensure_timezone_aware(traffic_data) if not traffic_data.empty else traffic_data # Step 2: Apply date alignment if we have date constraints sales_clean = await self._apply_date_alignment(sales_clean, weather_data, traffic_data) # Step 3: Aggregate to daily level daily_sales = await self._aggregate_daily_sales(sales_clean) # Step 4: Add temporal features daily_sales = self._add_temporal_features(daily_sales) # Step 5: Merge external data sources daily_sales = self._merge_weather_features(daily_sales, weather_data) daily_sales = self._merge_traffic_features(daily_sales, traffic_data) # Step 6: Engineer additional features daily_sales = self._engineer_features(daily_sales) # Step 7: Handle missing values daily_sales = self._handle_missing_values(daily_sales) # Step 8: Prepare for Prophet (rename columns and validate) prophet_data = self._prepare_prophet_format(daily_sales) logger.info(f"Prepared {len(prophet_data)} data points for {product_name}") return prophet_data except Exception as e: logger.error(f"Error preparing training data for {product_name}: {str(e)}") raise async def prepare_prediction_features(self, future_dates: pd.DatetimeIndex, weather_forecast: pd.DataFrame = None, traffic_forecast: pd.DataFrame = None) -> pd.DataFrame: """ Create features for future predictions with proper date handling. Args: future_dates: Future dates to predict weather_forecast: Weather forecast data traffic_forecast: Traffic forecast data Returns: DataFrame with features for prediction """ try: # Create base future dataframe future_df = pd.DataFrame({'ds': future_dates}) # Add temporal features future_df = self._add_temporal_features( future_df.rename(columns={'ds': 'date'}) ).rename(columns={'date': 'ds'}) # Add weather features if weather_forecast is not None and not weather_forecast.empty: weather_features = weather_forecast.copy() if 'date' in weather_features.columns: weather_features = weather_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(weather_features, on='ds', how='left') # Add traffic features if traffic_forecast is not None and not traffic_forecast.empty: traffic_features = traffic_forecast.copy() if 'date' in traffic_features.columns: traffic_features = traffic_features.rename(columns={'date': 'ds'}) future_df = future_df.merge(traffic_features, on='ds', how='left') # Engineer additional features future_df = self._engineer_features(future_df.rename(columns={'ds': 'date'})) future_df = future_df.rename(columns={'date': 'ds'}) # Handle missing values in future data future_df = self._handle_missing_values_future(future_df) return future_df except Exception as e: logger.error(f"Error creating prediction features: {e}") # Return minimal features if error return pd.DataFrame({'ds': future_dates}) async def _apply_date_alignment(self, sales_data: pd.DataFrame, weather_data: pd.DataFrame, traffic_data: pd.DataFrame) -> pd.DataFrame: """ Apply date alignment constraints to ensure data consistency across sources. """ try: if sales_data.empty: return sales_data # Create date range from sales data sales_dates = pd.to_datetime(sales_data['date']) sales_date_range = DateRange( start=sales_dates.min(), end=sales_dates.max(), source=DataSourceType.BAKERY_SALES ) # Get aligned date range considering all constraints aligned_range = self.date_alignment_service.validate_and_align_dates( user_sales_range=sales_date_range ) # Filter sales data to aligned range mask = (sales_dates >= aligned_range.start) & (sales_dates <= aligned_range.end) filtered_sales = sales_data[mask].copy() logger.info(f"Date alignment: {len(sales_data)} → {len(filtered_sales)} records") logger.info(f"Aligned date range: {aligned_range.start.date()} to {aligned_range.end.date()}") if aligned_range.constraints: logger.info(f"Applied constraints: {aligned_range.constraints}") return filtered_sales except Exception as e: logger.warning(f"Date alignment failed, using original data: {str(e)}") return sales_data async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame: """Process and clean sales data with enhanced validation""" sales_clean = sales_data.copy() # Ensure date column exists and is datetime if 'date' not in sales_clean.columns: raise ValueError("Sales data must have a 'date' column") sales_clean['date'] = pd.to_datetime(sales_clean['date']) # Handle different quantity column names quantity_columns = ['quantity', 'quantity_sold', 'sales', 'units_sold'] quantity_col = None for col in quantity_columns: if col in sales_clean.columns: quantity_col = col break if quantity_col is None: raise ValueError(f"Sales data must have one of these columns: {quantity_columns}") # Standardize to 'quantity' if quantity_col != 'quantity': sales_clean['quantity'] = sales_clean[quantity_col] logger.info(f"Mapped '{quantity_col}' to 'quantity' column") sales_clean['quantity'] = pd.to_numeric(sales_clean['quantity'], errors='coerce') # Remove rows with invalid quantities sales_clean = sales_clean.dropna(subset=['quantity']) sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales # Filter for the specific product if product_name column exists if 'product_name' in sales_clean.columns: sales_clean = sales_clean[sales_clean['product_name'] == product_name] # Remove duplicate dates (keep the one with highest quantity) sales_clean = sales_clean.sort_values(['date', 'quantity'], ascending=[True, False]) sales_clean = sales_clean.drop_duplicates(subset=['date'], keep='first') return sales_clean async def _aggregate_daily_sales(self, sales_data: pd.DataFrame) -> pd.DataFrame: """Aggregate sales to daily level with improved date handling""" if sales_data.empty: return pd.DataFrame(columns=['date', 'quantity']) # Group by date and sum quantities daily_sales = sales_data.groupby('date').agg({ 'quantity': 'sum' }).reset_index() # Ensure we have data for all dates in the range (fill gaps with 0) date_range = pd.date_range( start=daily_sales['date'].min(), end=daily_sales['date'].max(), freq='D' ) full_date_df = pd.DataFrame({'date': date_range}) daily_sales = full_date_df.merge(daily_sales, on='date', how='left') daily_sales['quantity'] = daily_sales['quantity'].fillna(0) # Fill missing days with 0 sales return daily_sales def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame: """Add comprehensive temporal features for bakery demand patterns""" df = df.copy() # Ensure we have a date column if 'date' not in df.columns: raise ValueError("DataFrame must have a 'date' column") df['date'] = pd.to_datetime(df['date']) # Basic temporal features df['day_of_week'] = df['date'].dt.dayofweek # 0=Monday, 6=Sunday df['day_of_month'] = df['date'].dt.day df['month'] = df['date'].dt.month df['quarter'] = df['date'].dt.quarter df['week_of_year'] = df['date'].dt.isocalendar().week # Bakery-specific features df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) df['is_monday'] = (df['day_of_week'] == 0).astype(int) # Monday often has different patterns df['is_friday'] = (df['day_of_week'] == 4).astype(int) # Friday often busy # Season mapping for Madrid df['season'] = df['month'].apply(self._get_season) df['is_summer'] = (df['season'] == 3).astype(int) # Summer seasonality df['is_winter'] = (df['season'] == 1).astype(int) # Winter seasonality # Holiday and special day indicators df['is_holiday'] = df['date'].apply(self._is_spanish_holiday).astype(int) df['is_school_holiday'] = df['date'].apply(self._is_school_holiday).astype(int) df['is_month_start'] = (df['day_of_month'] <= 3).astype(int) df['is_month_end'] = (df['day_of_month'] >= 28).astype(int) # Payday patterns (common in Spain: end/beginning of month) df['is_payday_period'] = ((df['day_of_month'] <= 5) | (df['day_of_month'] >= 25)).astype(int) return df def _merge_weather_features(self, daily_sales: pd.DataFrame, weather_data: pd.DataFrame) -> pd.DataFrame: """Merge weather features with enhanced Madrid-specific handling""" # ✅ FIX: Define weather_defaults OUTSIDE try block to fix scope error weather_defaults = { 'temperature': 15.0, 'precipitation': 0.0, 'humidity': 60.0, 'wind_speed': 5.0, 'pressure': 1013.0 } if weather_data.empty: # Add default weather columns for feature, default_value in weather_defaults.items(): daily_sales[feature] = default_value return daily_sales try: weather_clean = weather_data.copy() # Standardize date column if 'date' not in weather_clean.columns and 'ds' in weather_clean.columns: weather_clean = weather_clean.rename(columns={'ds': 'date'}) # ✅ FIX: Ensure timezone consistency weather_clean['date'] = pd.to_datetime(weather_clean['date']) daily_sales['date'] = pd.to_datetime(daily_sales['date']) # Remove timezone info from both to make them compatible if weather_clean['date'].dt.tz is not None: weather_clean['date'] = weather_clean['date'].dt.tz_localize(None) if daily_sales['date'].dt.tz is not None: daily_sales['date'] = daily_sales['date'].dt.tz_localize(None) # Map weather columns to standard names weather_mapping = { 'temperature': ['temperature', 'temp', 'temperatura'], 'precipitation': ['precipitation', 'precip', 'rain', 'lluvia'], 'humidity': ['humidity', 'humedad', 'relative_humidity'], 'wind_speed': ['wind_speed', 'viento', 'wind'], 'pressure': ['pressure', 'presion', 'atmospheric_pressure'] } weather_features = ['date'] for standard_name, possible_names in weather_mapping.items(): for possible_name in possible_names: if possible_name in weather_clean.columns: weather_clean[standard_name] = pd.to_numeric(weather_clean[possible_name], errors='coerce') weather_features.append(standard_name) break # Keep only the features we found weather_clean = weather_clean[weather_features].copy() # Merge with sales data merged = daily_sales.merge(weather_clean, on='date', how='left') # Fill missing weather values with Madrid-appropriate defaults for feature, default_value in weather_defaults.items(): if feature in merged.columns: merged[feature] = merged[feature].fillna(default_value) return merged except Exception as e: logger.warning(f"Error merging weather data: {e}") # Add default weather columns if merge fails (weather_defaults now in scope) for feature, default_value in weather_defaults.items(): daily_sales[feature] = default_value return daily_sales def _merge_traffic_features(self, daily_sales: pd.DataFrame, traffic_data: pd.DataFrame) -> pd.DataFrame: """Merge traffic features with enhanced Madrid-specific handling""" if traffic_data.empty: # Add default traffic column daily_sales['traffic_volume'] = 100.0 # Neutral traffic level return daily_sales try: traffic_clean = traffic_data.copy() # Standardize date column if 'date' not in traffic_clean.columns and 'ds' in traffic_clean.columns: traffic_clean = traffic_clean.rename(columns={'ds': 'date'}) traffic_clean['date'] = pd.to_datetime(traffic_clean['date']) # Map traffic columns to standard names traffic_mapping = { 'traffic_volume': ['traffic_volume', 'traffic_intensity', 'trafico', 'intensidad', 'volume'], 'pedestrian_count': ['pedestrian_count', 'peatones', 'pedestrians'], 'congestion_level': ['congestion_level', 'congestion', 'nivel_congestion'], 'average_speed': ['average_speed', 'speed', 'velocidad_media', 'avg_speed'] } traffic_features = ['date'] for standard_name, possible_names in traffic_mapping.items(): for possible_name in possible_names: if possible_name in traffic_clean.columns: traffic_clean[standard_name] = pd.to_numeric(traffic_clean[possible_name], errors='coerce') traffic_features.append(standard_name) break # Keep only the features we found traffic_clean = traffic_clean[traffic_features].copy() # Merge with sales data merged = daily_sales.merge(traffic_clean, on='date', how='left') # Fill missing traffic values with reasonable defaults traffic_defaults = { 'traffic_volume': 100.0, 'pedestrian_count': 50.0, 'congestion_level': 1.0, # Low congestion 'average_speed': 30.0 # km/h typical for Madrid } for feature, default_value in traffic_defaults.items(): if feature in merged.columns: merged[feature] = merged[feature].fillna(default_value) return merged except Exception as e: logger.warning(f"Error merging traffic data: {e}") # Add default traffic column if merge fails daily_sales['traffic_volume'] = 100.0 return daily_sales def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame: """Engineer additional features from existing data with bakery-specific insights""" df = df.copy() # Weather-based features if 'temperature' in df.columns: df['temp_squared'] = df['temperature'] ** 2 df['is_hot_day'] = (df['temperature'] > 25).astype(int) # Hot days in Madrid df['is_cold_day'] = (df['temperature'] < 10).astype(int) # Cold days df['is_pleasant_day'] = ((df['temperature'] >= 18) & (df['temperature'] <= 25)).astype(int) # Temperature categories for bakery products df['temp_category'] = pd.cut(df['temperature'], bins=[-np.inf, 5, 15, 25, np.inf], labels=[0, 1, 2, 3]).astype(int) if 'precipitation' in df.columns: df['is_rainy_day'] = (df['precipitation'] > 0.1).astype(int) df['is_heavy_rain'] = (df['precipitation'] > 10).astype(int) df['rain_intensity'] = pd.cut(df['precipitation'], bins=[-0.1, 0, 2, 10, np.inf], labels=[0, 1, 2, 3]).astype(int) # ✅ FIX: Traffic-based features with NaN protection if 'traffic_volume' in df.columns: # Calculate traffic quantiles for relative measures q75 = df['traffic_volume'].quantile(0.75) q25 = df['traffic_volume'].quantile(0.25) df['high_traffic'] = (df['traffic_volume'] > q75).astype(int) df['low_traffic'] = (df['traffic_volume'] < q25).astype(int) # ✅ FIX: Safe normalization with NaN protection traffic_std = df['traffic_volume'].std() traffic_mean = df['traffic_volume'].mean() if traffic_std > 0 and not pd.isna(traffic_std) and not pd.isna(traffic_mean): # Normal case: valid standard deviation df['traffic_normalized'] = (df['traffic_volume'] - traffic_mean) / traffic_std else: # Edge case: all values are the same or contain NaN logger.warning("Traffic volume has zero standard deviation or contains NaN, using zeros for normalized values") df['traffic_normalized'] = 0.0 # ✅ ADDITIONAL SAFETY: Fill any remaining NaN values df['traffic_normalized'] = df['traffic_normalized'].fillna(0.0) # Interaction features - bakery specific if 'is_weekend' in df.columns and 'temperature' in df.columns: df['weekend_temp_interaction'] = df['is_weekend'] * df['temperature'] df['weekend_pleasant_weather'] = df['is_weekend'] * df.get('is_pleasant_day', 0) if 'is_rainy_day' in df.columns and 'traffic_volume' in df.columns: df['rain_traffic_interaction'] = df['is_rainy_day'] * df['traffic_volume'] if 'is_holiday' in df.columns and 'temperature' in df.columns: df['holiday_temp_interaction'] = df['is_holiday'] * df['temperature'] # Seasonal interactions if 'season' in df.columns and 'temperature' in df.columns: df['season_temp_interaction'] = df['season'] * df['temperature'] # Day-of-week specific features if 'day_of_week' in df.columns: # Working days vs weekends df['is_working_day'] = (~df['day_of_week'].isin([5, 6])).astype(int) # Peak bakery days (Friday, Saturday, Sunday often busy) df['is_peak_bakery_day'] = df['day_of_week'].isin([4, 5, 6]).astype(int) # Month-specific features for bakery seasonality if 'month' in df.columns: # High-demand months (holidays, summer) df['is_high_demand_month'] = df['month'].isin([6, 7, 8, 12]).astype(int) # Spring/summer months df['is_warm_season'] = df['month'].isin([4, 5, 6, 7, 8, 9]).astype(int) # ✅ FINAL SAFETY CHECK: Remove any remaining NaN values # Check for NaN values in all numeric columns and fill them numeric_columns = df.select_dtypes(include=[np.number]).columns for col in numeric_columns: if df[col].isna().any(): nan_count = df[col].isna().sum() logger.warning(f"Found {nan_count} NaN values in column '{col}', filling with 0") df[col] = df[col].fillna(0.0) return df def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: """Handle missing values in the dataset with improved strategies""" df = df.copy() # For numeric columns, use appropriate imputation strategies numeric_columns = df.select_dtypes(include=[np.number]).columns for col in numeric_columns: if col != 'quantity' and df[col].isna().any(): # Use different strategies based on column type if 'temperature' in col: df[col] = df[col].fillna(15.0) # Madrid average elif 'precipitation' in col or 'rain' in col: df[col] = df[col].fillna(0.0) # Default no rain elif 'humidity' in col: df[col] = df[col].fillna(60.0) # Moderate humidity elif 'traffic' in col: df[col] = df[col].fillna(df[col].median()) # Use median for traffic elif 'wind' in col: df[col] = df[col].fillna(5.0) # Light wind elif 'pressure' in col: df[col] = df[col].fillna(1013.0) # Standard atmospheric pressure else: # For other columns, use median or forward fill if df[col].count() > 0: df[col] = df[col].fillna(df[col].median()) else: df[col] = df[col].fillna(0) return df def _handle_missing_values_future(self, df: pd.DataFrame) -> pd.DataFrame: """Handle missing values in future prediction data""" numeric_columns = df.select_dtypes(include=[np.number]).columns madrid_defaults = { 'temperature': 15.0, 'precipitation': 0.0, 'humidity': 60.0, 'wind_speed': 5.0, 'traffic_volume': 100.0, 'pedestrian_count': 50.0, 'pressure': 1013.0 } for col in numeric_columns: if df[col].isna().any(): # Find appropriate default value default_value = 0 for key, value in madrid_defaults.items(): if key in col.lower(): default_value = value break df[col] = df[col].fillna(default_value) return df def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame: """Prepare data in Prophet format with enhanced validation""" prophet_df = df.copy() # Rename columns for Prophet if 'date' in prophet_df.columns: prophet_df = prophet_df.rename(columns={'date': 'ds'}) if 'quantity' in prophet_df.columns: prophet_df = prophet_df.rename(columns={'quantity': 'y'}) # Ensure ds is datetime and remove timezone info if 'ds' in prophet_df.columns: prophet_df['ds'] = pd.to_datetime(prophet_df['ds']) if prophet_df['ds'].dt.tz is not None: prophet_df['ds'] = prophet_df['ds'].dt.tz_localize(None) # Validate required columns if 'ds' not in prophet_df.columns or 'y' not in prophet_df.columns: raise ValueError("Prophet data must have 'ds' and 'y' columns") # Clean target values prophet_df = prophet_df.dropna(subset=['y']) prophet_df['y'] = prophet_df['y'].clip(lower=0) # No negative sales # Remove any duplicate dates (keep last occurrence) prophet_df = prophet_df.drop_duplicates(subset=['ds'], keep='last') # Sort by date prophet_df = prophet_df.sort_values('ds').reset_index(drop=True) # Final validation if len(prophet_df) == 0: raise ValueError("No valid data points after cleaning") logger.info(f"Prophet data prepared: {len(prophet_df)} rows, " f"date range: {prophet_df['ds'].min()} to {prophet_df['ds'].max()}") return prophet_df def _get_season(self, month: int) -> int: """Get season from month (1-4 for Winter, Spring, Summer, Autumn)""" if month in [12, 1, 2]: return 1 # Winter elif month in [3, 4, 5]: return 2 # Spring elif month in [6, 7, 8]: return 3 # Summer else: return 4 # Autumn def _is_spanish_holiday(self, date: datetime) -> bool: """Check if a date is a major Spanish holiday""" month_day = (date.month, date.day) # Major Spanish holidays that affect bakery sales spanish_holidays = [ (1, 1), # New Year (1, 6), # Epiphany (Reyes) (5, 1), # Labour Day (8, 15), # Assumption (10, 12), # National Day (11, 1), # All Saints (12, 6), # Constitution (12, 8), # Immaculate Conception (12, 25), # Christmas (5, 15), # San Isidro (Madrid patron saint) (5, 2), # Madrid Community Day ] return month_day in spanish_holidays def _is_school_holiday(self, date: datetime) -> bool: """Check if a date is during school holidays (approximate)""" month = date.month # Approximate Spanish school holiday periods # Summer holidays (July-August) if month in [7, 8]: return True # Christmas holidays (mid December to early January) if month == 12 and date.day >= 20: return True if month == 1 and date.day <= 10: return True # Easter holidays (approximate - early April) if month == 4 and date.day <= 15: return True return False def calculate_feature_importance(self, model_data: pd.DataFrame, target_column: str = 'y') -> Dict[str, float]: """ Calculate feature importance for the model using correlation analysis. """ try: # Get numeric features numeric_features = model_data.select_dtypes(include=[np.number]).columns numeric_features = [col for col in numeric_features if col != target_column] importance_scores = {} if target_column not in model_data.columns: logger.warning(f"Target column '{target_column}' not found") return {} for feature in numeric_features: if feature in model_data.columns: correlation = model_data[feature].corr(model_data[target_column]) if not pd.isna(correlation) and not np.isinf(correlation): importance_scores[feature] = abs(correlation) # Sort by importance importance_scores = dict(sorted(importance_scores.items(), key=lambda x: x[1], reverse=True)) logger.info(f"Calculated feature importance for {len(importance_scores)} features") return importance_scores except Exception as e: logger.error(f"Error calculating feature importance: {e}") return {} def get_data_quality_report(self, df: pd.DataFrame) -> Dict[str, Any]: """ Generate a comprehensive data quality report. """ try: report = { "total_records": len(df), "date_range": { "start": df['ds'].min().isoformat() if 'ds' in df.columns else None, "end": df['ds'].max().isoformat() if 'ds' in df.columns else None, "duration_days": (df['ds'].max() - df['ds'].min()).days if 'ds' in df.columns else 0 }, "missing_values": {}, "data_completeness": 0.0, "target_statistics": {}, "feature_count": 0 } # Calculate missing values missing_counts = df.isnull().sum() total_cells = len(df) for col in df.columns: missing_count = missing_counts[col] report["missing_values"][col] = { "count": int(missing_count), "percentage": round((missing_count / total_cells) * 100, 2) } # Overall completeness total_missing = missing_counts.sum() total_possible = len(df) * len(df.columns) report["data_completeness"] = round(((total_possible - total_missing) / total_possible) * 100, 2) # Target variable statistics if 'y' in df.columns: y_col = df['y'] report["target_statistics"] = { "mean": round(y_col.mean(), 2), "median": round(y_col.median(), 2), "std": round(y_col.std(), 2), "min": round(y_col.min(), 2), "max": round(y_col.max(), 2), "zero_count": int((y_col == 0).sum()), "zero_percentage": round(((y_col == 0).sum() / len(y_col)) * 100, 2) } # Feature count numeric_features = df.select_dtypes(include=[np.number]).columns report["feature_count"] = len([col for col in numeric_features if col not in ['y', 'ds']]) return report except Exception as e: logger.error(f"Error generating data quality report: {e}") return {"error": str(e)}