537 lines
12 KiB
Python
537 lines
12 KiB
Python
|
|
"""
|
||
|
|
Time Series Utilities
|
||
|
|
|
||
|
|
Provides utilities for time-series analysis, projection, and calculations
|
||
|
|
used in forecasting and inventory planning.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import statistics
|
||
|
|
from datetime import date, datetime, timedelta
|
||
|
|
from typing import List, Dict, Tuple, Optional
|
||
|
|
from decimal import Decimal
|
||
|
|
import math
|
||
|
|
|
||
|
|
|
||
|
|
def generate_date_range(
|
||
|
|
start_date: date,
|
||
|
|
end_date: date,
|
||
|
|
include_end: bool = True
|
||
|
|
) -> List[date]:
|
||
|
|
"""
|
||
|
|
Generate a list of dates between start and end.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
start_date: Start date (inclusive)
|
||
|
|
end_date: End date
|
||
|
|
include_end: Whether to include end date
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of dates
|
||
|
|
"""
|
||
|
|
dates = []
|
||
|
|
current = start_date
|
||
|
|
|
||
|
|
while current < end_date or (include_end and current == end_date):
|
||
|
|
dates.append(current)
|
||
|
|
current += timedelta(days=1)
|
||
|
|
|
||
|
|
return dates
|
||
|
|
|
||
|
|
|
||
|
|
def generate_future_dates(
|
||
|
|
start_date: date,
|
||
|
|
num_days: int
|
||
|
|
) -> List[date]:
|
||
|
|
"""
|
||
|
|
Generate a list of future dates starting from start_date.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
start_date: Starting date
|
||
|
|
num_days: Number of days to generate
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of dates
|
||
|
|
"""
|
||
|
|
return [start_date + timedelta(days=i) for i in range(num_days)]
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_moving_average(
|
||
|
|
values: List[float],
|
||
|
|
window_size: int
|
||
|
|
) -> List[float]:
|
||
|
|
"""
|
||
|
|
Calculate moving average over a window.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
window_size: Size of moving window
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of moving averages
|
||
|
|
"""
|
||
|
|
if len(values) < window_size:
|
||
|
|
return []
|
||
|
|
|
||
|
|
moving_averages = []
|
||
|
|
for i in range(len(values) - window_size + 1):
|
||
|
|
window = values[i:i + window_size]
|
||
|
|
moving_averages.append(sum(window) / window_size)
|
||
|
|
|
||
|
|
return moving_averages
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_standard_deviation(values: List[float]) -> float:
|
||
|
|
"""
|
||
|
|
Calculate standard deviation of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Standard deviation
|
||
|
|
"""
|
||
|
|
if len(values) < 2:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
return statistics.stdev(values)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_variance(values: List[float]) -> float:
|
||
|
|
"""
|
||
|
|
Calculate variance of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Variance
|
||
|
|
"""
|
||
|
|
if len(values) < 2:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
return statistics.variance(values)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_mean(values: List[float]) -> float:
|
||
|
|
"""
|
||
|
|
Calculate mean of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Mean
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
return statistics.mean(values)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_median(values: List[float]) -> float:
|
||
|
|
"""
|
||
|
|
Calculate median of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Median
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
return statistics.median(values)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_percentile(values: List[float], percentile: float) -> float:
|
||
|
|
"""
|
||
|
|
Calculate percentile of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
percentile: Percentile to calculate (0-100)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Percentile value
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
sorted_values = sorted(values)
|
||
|
|
k = (len(sorted_values) - 1) * percentile / 100
|
||
|
|
f = math.floor(k)
|
||
|
|
c = math.ceil(k)
|
||
|
|
|
||
|
|
if f == c:
|
||
|
|
return sorted_values[int(k)]
|
||
|
|
|
||
|
|
d0 = sorted_values[int(f)] * (c - k)
|
||
|
|
d1 = sorted_values[int(c)] * (k - f)
|
||
|
|
return d0 + d1
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_coefficient_of_variation(values: List[float]) -> float:
|
||
|
|
"""
|
||
|
|
Calculate coefficient of variation (CV = stddev / mean).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Coefficient of variation
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
mean = calculate_mean(values)
|
||
|
|
if mean == 0:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
stddev = calculate_standard_deviation(values)
|
||
|
|
return stddev / mean
|
||
|
|
|
||
|
|
|
||
|
|
def aggregate_by_date(
|
||
|
|
data: List[Tuple[date, float]],
|
||
|
|
aggregation: str = "sum"
|
||
|
|
) -> Dict[date, float]:
|
||
|
|
"""
|
||
|
|
Aggregate time-series data by date.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
data: List of (date, value) tuples
|
||
|
|
aggregation: Aggregation method ('sum', 'mean', 'max', 'min')
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary mapping date to aggregated value
|
||
|
|
"""
|
||
|
|
by_date: Dict[date, List[float]] = {}
|
||
|
|
|
||
|
|
for dt, value in data:
|
||
|
|
if dt not in by_date:
|
||
|
|
by_date[dt] = []
|
||
|
|
by_date[dt].append(value)
|
||
|
|
|
||
|
|
result = {}
|
||
|
|
for dt, values in by_date.items():
|
||
|
|
if aggregation == "sum":
|
||
|
|
result[dt] = sum(values)
|
||
|
|
elif aggregation == "mean":
|
||
|
|
result[dt] = calculate_mean(values)
|
||
|
|
elif aggregation == "max":
|
||
|
|
result[dt] = max(values)
|
||
|
|
elif aggregation == "min":
|
||
|
|
result[dt] = min(values)
|
||
|
|
else:
|
||
|
|
result[dt] = sum(values)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def fill_missing_dates(
|
||
|
|
data: Dict[date, float],
|
||
|
|
start_date: date,
|
||
|
|
end_date: date,
|
||
|
|
fill_value: float = 0.0
|
||
|
|
) -> Dict[date, float]:
|
||
|
|
"""
|
||
|
|
Fill missing dates in time-series data.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
data: Dictionary mapping date to value
|
||
|
|
start_date: Start date
|
||
|
|
end_date: End date
|
||
|
|
fill_value: Value to use for missing dates
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary with all dates filled
|
||
|
|
"""
|
||
|
|
date_range = generate_date_range(start_date, end_date)
|
||
|
|
filled_data = {}
|
||
|
|
|
||
|
|
for dt in date_range:
|
||
|
|
filled_data[dt] = data.get(dt, fill_value)
|
||
|
|
|
||
|
|
return filled_data
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_trend(
|
||
|
|
values: List[float]
|
||
|
|
) -> Tuple[float, float]:
|
||
|
|
"""
|
||
|
|
Calculate linear trend (slope and intercept) using least squares.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (slope, intercept)
|
||
|
|
"""
|
||
|
|
if len(values) < 2:
|
||
|
|
return 0.0, values[0] if values else 0.0
|
||
|
|
|
||
|
|
n = len(values)
|
||
|
|
x = list(range(n))
|
||
|
|
y = values
|
||
|
|
|
||
|
|
# Calculate means
|
||
|
|
x_mean = sum(x) / n
|
||
|
|
y_mean = sum(y) / n
|
||
|
|
|
||
|
|
# Calculate slope
|
||
|
|
numerator = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n))
|
||
|
|
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
|
||
|
|
|
||
|
|
if denominator == 0:
|
||
|
|
return 0.0, y_mean
|
||
|
|
|
||
|
|
slope = numerator / denominator
|
||
|
|
intercept = y_mean - slope * x_mean
|
||
|
|
|
||
|
|
return slope, intercept
|
||
|
|
|
||
|
|
|
||
|
|
def project_value(
|
||
|
|
historical_values: List[float],
|
||
|
|
periods_ahead: int,
|
||
|
|
method: str = "mean"
|
||
|
|
) -> List[float]:
|
||
|
|
"""
|
||
|
|
Project future values based on historical data.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
historical_values: Historical values
|
||
|
|
periods_ahead: Number of periods to project
|
||
|
|
method: Projection method ('mean', 'trend', 'last')
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of projected values
|
||
|
|
"""
|
||
|
|
if not historical_values:
|
||
|
|
return [0.0] * periods_ahead
|
||
|
|
|
||
|
|
if method == "mean":
|
||
|
|
# Use historical mean
|
||
|
|
projected_value = calculate_mean(historical_values)
|
||
|
|
return [projected_value] * periods_ahead
|
||
|
|
|
||
|
|
elif method == "last":
|
||
|
|
# Use last value
|
||
|
|
return [historical_values[-1]] * periods_ahead
|
||
|
|
|
||
|
|
elif method == "trend":
|
||
|
|
# Use trend projection
|
||
|
|
slope, intercept = calculate_trend(historical_values)
|
||
|
|
n = len(historical_values)
|
||
|
|
return [slope * (n + i) + intercept for i in range(periods_ahead)]
|
||
|
|
|
||
|
|
else:
|
||
|
|
# Default to mean
|
||
|
|
projected_value = calculate_mean(historical_values)
|
||
|
|
return [projected_value] * periods_ahead
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_cumulative_sum(values: List[float]) -> List[float]:
|
||
|
|
"""
|
||
|
|
Calculate cumulative sum of values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of cumulative sums
|
||
|
|
"""
|
||
|
|
cumulative = []
|
||
|
|
total = 0.0
|
||
|
|
|
||
|
|
for value in values:
|
||
|
|
total += value
|
||
|
|
cumulative.append(total)
|
||
|
|
|
||
|
|
return cumulative
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_rolling_sum(
|
||
|
|
values: List[float],
|
||
|
|
window_size: int
|
||
|
|
) -> List[float]:
|
||
|
|
"""
|
||
|
|
Calculate rolling sum over a window.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
window_size: Size of rolling window
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of rolling sums
|
||
|
|
"""
|
||
|
|
if len(values) < window_size:
|
||
|
|
return []
|
||
|
|
|
||
|
|
rolling_sums = []
|
||
|
|
for i in range(len(values) - window_size + 1):
|
||
|
|
window = values[i:i + window_size]
|
||
|
|
rolling_sums.append(sum(window))
|
||
|
|
|
||
|
|
return rolling_sums
|
||
|
|
|
||
|
|
|
||
|
|
def normalize_values(
|
||
|
|
values: List[float],
|
||
|
|
method: str = "minmax"
|
||
|
|
) -> List[float]:
|
||
|
|
"""
|
||
|
|
Normalize values to a standard range.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
method: Normalization method ('minmax' or 'zscore')
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of normalized values
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return []
|
||
|
|
|
||
|
|
if method == "minmax":
|
||
|
|
# Scale to [0, 1]
|
||
|
|
min_val = min(values)
|
||
|
|
max_val = max(values)
|
||
|
|
|
||
|
|
if max_val == min_val:
|
||
|
|
return [0.5] * len(values)
|
||
|
|
|
||
|
|
return [(v - min_val) / (max_val - min_val) for v in values]
|
||
|
|
|
||
|
|
elif method == "zscore":
|
||
|
|
# Z-score normalization
|
||
|
|
mean = calculate_mean(values)
|
||
|
|
stddev = calculate_standard_deviation(values)
|
||
|
|
|
||
|
|
if stddev == 0:
|
||
|
|
return [0.0] * len(values)
|
||
|
|
|
||
|
|
return [(v - mean) / stddev for v in values]
|
||
|
|
|
||
|
|
else:
|
||
|
|
return values
|
||
|
|
|
||
|
|
|
||
|
|
def detect_outliers(
|
||
|
|
values: List[float],
|
||
|
|
method: str = "iqr",
|
||
|
|
threshold: float = 1.5
|
||
|
|
) -> List[bool]:
|
||
|
|
"""
|
||
|
|
Detect outliers in values.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values
|
||
|
|
method: Detection method ('iqr' or 'zscore')
|
||
|
|
threshold: Threshold for outlier detection
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of booleans indicating outliers
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return []
|
||
|
|
|
||
|
|
if method == "iqr":
|
||
|
|
# Interquartile range method
|
||
|
|
q1 = calculate_percentile(values, 25)
|
||
|
|
q3 = calculate_percentile(values, 75)
|
||
|
|
iqr = q3 - q1
|
||
|
|
|
||
|
|
lower_bound = q1 - threshold * iqr
|
||
|
|
upper_bound = q3 + threshold * iqr
|
||
|
|
|
||
|
|
return [v < lower_bound or v > upper_bound for v in values]
|
||
|
|
|
||
|
|
elif method == "zscore":
|
||
|
|
# Z-score method
|
||
|
|
mean = calculate_mean(values)
|
||
|
|
stddev = calculate_standard_deviation(values)
|
||
|
|
|
||
|
|
if stddev == 0:
|
||
|
|
return [False] * len(values)
|
||
|
|
|
||
|
|
z_scores = [(v - mean) / stddev for v in values]
|
||
|
|
return [abs(z) > threshold for z in z_scores]
|
||
|
|
|
||
|
|
else:
|
||
|
|
return [False] * len(values)
|
||
|
|
|
||
|
|
|
||
|
|
def interpolate_missing_values(
|
||
|
|
values: List[Optional[float]],
|
||
|
|
method: str = "linear"
|
||
|
|
) -> List[float]:
|
||
|
|
"""
|
||
|
|
Interpolate missing values in a time series.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
values: List of values with possible None values
|
||
|
|
method: Interpolation method ('linear', 'forward', 'backward')
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List with interpolated values
|
||
|
|
"""
|
||
|
|
if not values:
|
||
|
|
return []
|
||
|
|
|
||
|
|
result = []
|
||
|
|
|
||
|
|
if method == "forward":
|
||
|
|
# Forward fill
|
||
|
|
last_valid = None
|
||
|
|
for v in values:
|
||
|
|
if v is not None:
|
||
|
|
last_valid = v
|
||
|
|
result.append(last_valid if last_valid is not None else 0.0)
|
||
|
|
|
||
|
|
elif method == "backward":
|
||
|
|
# Backward fill
|
||
|
|
next_valid = None
|
||
|
|
for v in reversed(values):
|
||
|
|
if v is not None:
|
||
|
|
next_valid = v
|
||
|
|
result.insert(0, next_valid if next_valid is not None else 0.0)
|
||
|
|
|
||
|
|
else: # linear
|
||
|
|
# Linear interpolation
|
||
|
|
result = list(values)
|
||
|
|
|
||
|
|
for i in range(len(result)):
|
||
|
|
if result[i] is None:
|
||
|
|
# Find previous and next valid values
|
||
|
|
prev_idx = None
|
||
|
|
next_idx = None
|
||
|
|
|
||
|
|
for j in range(i - 1, -1, -1):
|
||
|
|
if values[j] is not None:
|
||
|
|
prev_idx = j
|
||
|
|
break
|
||
|
|
|
||
|
|
for j in range(i + 1, len(values)):
|
||
|
|
if values[j] is not None:
|
||
|
|
next_idx = j
|
||
|
|
break
|
||
|
|
|
||
|
|
if prev_idx is not None and next_idx is not None:
|
||
|
|
# Linear interpolation
|
||
|
|
x0, y0 = prev_idx, values[prev_idx]
|
||
|
|
x1, y1 = next_idx, values[next_idx]
|
||
|
|
result[i] = y0 + (y1 - y0) * (i - x0) / (x1 - x0)
|
||
|
|
elif prev_idx is not None:
|
||
|
|
# Forward fill
|
||
|
|
result[i] = values[prev_idx]
|
||
|
|
elif next_idx is not None:
|
||
|
|
# Backward fill
|
||
|
|
result[i] = values[next_idx]
|
||
|
|
else:
|
||
|
|
# No valid values
|
||
|
|
result[i] = 0.0
|
||
|
|
|
||
|
|
return result
|