""" Time Series Utilities Provides utilities for time-series analysis, projection, and calculations used in forecasting and inventory planning. """ import statistics from datetime import date, datetime, timedelta from typing import List, Dict, Tuple, Optional from decimal import Decimal import math def generate_date_range( start_date: date, end_date: date, include_end: bool = True ) -> List[date]: """ Generate a list of dates between start and end. Args: start_date: Start date (inclusive) end_date: End date include_end: Whether to include end date Returns: List of dates """ dates = [] current = start_date while current < end_date or (include_end and current == end_date): dates.append(current) current += timedelta(days=1) return dates def generate_future_dates( start_date: date, num_days: int ) -> List[date]: """ Generate a list of future dates starting from start_date. Args: start_date: Starting date num_days: Number of days to generate Returns: List of dates """ return [start_date + timedelta(days=i) for i in range(num_days)] def calculate_moving_average( values: List[float], window_size: int ) -> List[float]: """ Calculate moving average over a window. Args: values: List of values window_size: Size of moving window Returns: List of moving averages """ if len(values) < window_size: return [] moving_averages = [] for i in range(len(values) - window_size + 1): window = values[i:i + window_size] moving_averages.append(sum(window) / window_size) return moving_averages def calculate_standard_deviation(values: List[float]) -> float: """ Calculate standard deviation of values. Args: values: List of values Returns: Standard deviation """ if len(values) < 2: return 0.0 return statistics.stdev(values) def calculate_variance(values: List[float]) -> float: """ Calculate variance of values. Args: values: List of values Returns: Variance """ if len(values) < 2: return 0.0 return statistics.variance(values) def calculate_mean(values: List[float]) -> float: """ Calculate mean of values. Args: values: List of values Returns: Mean """ if not values: return 0.0 return statistics.mean(values) def calculate_median(values: List[float]) -> float: """ Calculate median of values. Args: values: List of values Returns: Median """ if not values: return 0.0 return statistics.median(values) def calculate_percentile(values: List[float], percentile: float) -> float: """ Calculate percentile of values. Args: values: List of values percentile: Percentile to calculate (0-100) Returns: Percentile value """ if not values: return 0.0 sorted_values = sorted(values) k = (len(sorted_values) - 1) * percentile / 100 f = math.floor(k) c = math.ceil(k) if f == c: return sorted_values[int(k)] d0 = sorted_values[int(f)] * (c - k) d1 = sorted_values[int(c)] * (k - f) return d0 + d1 def calculate_coefficient_of_variation(values: List[float]) -> float: """ Calculate coefficient of variation (CV = stddev / mean). Args: values: List of values Returns: Coefficient of variation """ if not values: return 0.0 mean = calculate_mean(values) if mean == 0: return 0.0 stddev = calculate_standard_deviation(values) return stddev / mean def aggregate_by_date( data: List[Tuple[date, float]], aggregation: str = "sum" ) -> Dict[date, float]: """ Aggregate time-series data by date. Args: data: List of (date, value) tuples aggregation: Aggregation method ('sum', 'mean', 'max', 'min') Returns: Dictionary mapping date to aggregated value """ by_date: Dict[date, List[float]] = {} for dt, value in data: if dt not in by_date: by_date[dt] = [] by_date[dt].append(value) result = {} for dt, values in by_date.items(): if aggregation == "sum": result[dt] = sum(values) elif aggregation == "mean": result[dt] = calculate_mean(values) elif aggregation == "max": result[dt] = max(values) elif aggregation == "min": result[dt] = min(values) else: result[dt] = sum(values) return result def fill_missing_dates( data: Dict[date, float], start_date: date, end_date: date, fill_value: float = 0.0 ) -> Dict[date, float]: """ Fill missing dates in time-series data. Args: data: Dictionary mapping date to value start_date: Start date end_date: End date fill_value: Value to use for missing dates Returns: Dictionary with all dates filled """ date_range = generate_date_range(start_date, end_date) filled_data = {} for dt in date_range: filled_data[dt] = data.get(dt, fill_value) return filled_data def calculate_trend( values: List[float] ) -> Tuple[float, float]: """ Calculate linear trend (slope and intercept) using least squares. Args: values: List of values Returns: Tuple of (slope, intercept) """ if len(values) < 2: return 0.0, values[0] if values else 0.0 n = len(values) x = list(range(n)) y = values # Calculate means x_mean = sum(x) / n y_mean = sum(y) / n # Calculate slope numerator = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n)) denominator = sum((x[i] - x_mean) ** 2 for i in range(n)) if denominator == 0: return 0.0, y_mean slope = numerator / denominator intercept = y_mean - slope * x_mean return slope, intercept def project_value( historical_values: List[float], periods_ahead: int, method: str = "mean" ) -> List[float]: """ Project future values based on historical data. Args: historical_values: Historical values periods_ahead: Number of periods to project method: Projection method ('mean', 'trend', 'last') Returns: List of projected values """ if not historical_values: return [0.0] * periods_ahead if method == "mean": # Use historical mean projected_value = calculate_mean(historical_values) return [projected_value] * periods_ahead elif method == "last": # Use last value return [historical_values[-1]] * periods_ahead elif method == "trend": # Use trend projection slope, intercept = calculate_trend(historical_values) n = len(historical_values) return [slope * (n + i) + intercept for i in range(periods_ahead)] else: # Default to mean projected_value = calculate_mean(historical_values) return [projected_value] * periods_ahead def calculate_cumulative_sum(values: List[float]) -> List[float]: """ Calculate cumulative sum of values. Args: values: List of values Returns: List of cumulative sums """ cumulative = [] total = 0.0 for value in values: total += value cumulative.append(total) return cumulative def calculate_rolling_sum( values: List[float], window_size: int ) -> List[float]: """ Calculate rolling sum over a window. Args: values: List of values window_size: Size of rolling window Returns: List of rolling sums """ if len(values) < window_size: return [] rolling_sums = [] for i in range(len(values) - window_size + 1): window = values[i:i + window_size] rolling_sums.append(sum(window)) return rolling_sums def normalize_values( values: List[float], method: str = "minmax" ) -> List[float]: """ Normalize values to a standard range. Args: values: List of values method: Normalization method ('minmax' or 'zscore') Returns: List of normalized values """ if not values: return [] if method == "minmax": # Scale to [0, 1] min_val = min(values) max_val = max(values) if max_val == min_val: return [0.5] * len(values) return [(v - min_val) / (max_val - min_val) for v in values] elif method == "zscore": # Z-score normalization mean = calculate_mean(values) stddev = calculate_standard_deviation(values) if stddev == 0: return [0.0] * len(values) return [(v - mean) / stddev for v in values] else: return values def detect_outliers( values: List[float], method: str = "iqr", threshold: float = 1.5 ) -> List[bool]: """ Detect outliers in values. Args: values: List of values method: Detection method ('iqr' or 'zscore') threshold: Threshold for outlier detection Returns: List of booleans indicating outliers """ if not values: return [] if method == "iqr": # Interquartile range method q1 = calculate_percentile(values, 25) q3 = calculate_percentile(values, 75) iqr = q3 - q1 lower_bound = q1 - threshold * iqr upper_bound = q3 + threshold * iqr return [v < lower_bound or v > upper_bound for v in values] elif method == "zscore": # Z-score method mean = calculate_mean(values) stddev = calculate_standard_deviation(values) if stddev == 0: return [False] * len(values) z_scores = [(v - mean) / stddev for v in values] return [abs(z) > threshold for z in z_scores] else: return [False] * len(values) def interpolate_missing_values( values: List[Optional[float]], method: str = "linear" ) -> List[float]: """ Interpolate missing values in a time series. Args: values: List of values with possible None values method: Interpolation method ('linear', 'forward', 'backward') Returns: List with interpolated values """ if not values: return [] result = [] if method == "forward": # Forward fill last_valid = None for v in values: if v is not None: last_valid = v result.append(last_valid if last_valid is not None else 0.0) elif method == "backward": # Backward fill next_valid = None for v in reversed(values): if v is not None: next_valid = v result.insert(0, next_valid if next_valid is not None else 0.0) else: # linear # Linear interpolation result = list(values) for i in range(len(result)): if result[i] is None: # Find previous and next valid values prev_idx = None next_idx = None for j in range(i - 1, -1, -1): if values[j] is not None: prev_idx = j break for j in range(i + 1, len(values)): if values[j] is not None: next_idx = j break if prev_idx is not None and next_idx is not None: # Linear interpolation x0, y0 = prev_idx, values[prev_idx] x1, y1 = next_idx, values[next_idx] result[i] = y0 + (y1 - y0) * (i - x0) / (x1 - x0) elif prev_idx is not None: # Forward fill result[i] = values[prev_idx] elif next_idx is not None: # Backward fill result[i] = values[next_idx] else: # No valid values result[i] = 0.0 return result