Files
bakery-ia/shared/utils/time_series_utils.py

537 lines
12 KiB
Python
Raw Normal View History

2025-10-30 21:08:07 +01:00
"""
Time Series Utilities
Provides utilities for time-series analysis, projection, and calculations
used in forecasting and inventory planning.
"""
import statistics
from datetime import date, datetime, timedelta
from typing import List, Dict, Tuple, Optional
from decimal import Decimal
import math
def generate_date_range(
start_date: date,
end_date: date,
include_end: bool = True
) -> List[date]:
"""
Generate a list of dates between start and end.
Args:
start_date: Start date (inclusive)
end_date: End date
include_end: Whether to include end date
Returns:
List of dates
"""
dates = []
current = start_date
while current < end_date or (include_end and current == end_date):
dates.append(current)
current += timedelta(days=1)
return dates
def generate_future_dates(
start_date: date,
num_days: int
) -> List[date]:
"""
Generate a list of future dates starting from start_date.
Args:
start_date: Starting date
num_days: Number of days to generate
Returns:
List of dates
"""
return [start_date + timedelta(days=i) for i in range(num_days)]
def calculate_moving_average(
values: List[float],
window_size: int
) -> List[float]:
"""
Calculate moving average over a window.
Args:
values: List of values
window_size: Size of moving window
Returns:
List of moving averages
"""
if len(values) < window_size:
return []
moving_averages = []
for i in range(len(values) - window_size + 1):
window = values[i:i + window_size]
moving_averages.append(sum(window) / window_size)
return moving_averages
def calculate_standard_deviation(values: List[float]) -> float:
"""
Calculate standard deviation of values.
Args:
values: List of values
Returns:
Standard deviation
"""
if len(values) < 2:
return 0.0
return statistics.stdev(values)
def calculate_variance(values: List[float]) -> float:
"""
Calculate variance of values.
Args:
values: List of values
Returns:
Variance
"""
if len(values) < 2:
return 0.0
return statistics.variance(values)
def calculate_mean(values: List[float]) -> float:
"""
Calculate mean of values.
Args:
values: List of values
Returns:
Mean
"""
if not values:
return 0.0
return statistics.mean(values)
def calculate_median(values: List[float]) -> float:
"""
Calculate median of values.
Args:
values: List of values
Returns:
Median
"""
if not values:
return 0.0
return statistics.median(values)
def calculate_percentile(values: List[float], percentile: float) -> float:
"""
Calculate percentile of values.
Args:
values: List of values
percentile: Percentile to calculate (0-100)
Returns:
Percentile value
"""
if not values:
return 0.0
sorted_values = sorted(values)
k = (len(sorted_values) - 1) * percentile / 100
f = math.floor(k)
c = math.ceil(k)
if f == c:
return sorted_values[int(k)]
d0 = sorted_values[int(f)] * (c - k)
d1 = sorted_values[int(c)] * (k - f)
return d0 + d1
def calculate_coefficient_of_variation(values: List[float]) -> float:
"""
Calculate coefficient of variation (CV = stddev / mean).
Args:
values: List of values
Returns:
Coefficient of variation
"""
if not values:
return 0.0
mean = calculate_mean(values)
if mean == 0:
return 0.0
stddev = calculate_standard_deviation(values)
return stddev / mean
def aggregate_by_date(
data: List[Tuple[date, float]],
aggregation: str = "sum"
) -> Dict[date, float]:
"""
Aggregate time-series data by date.
Args:
data: List of (date, value) tuples
aggregation: Aggregation method ('sum', 'mean', 'max', 'min')
Returns:
Dictionary mapping date to aggregated value
"""
by_date: Dict[date, List[float]] = {}
for dt, value in data:
if dt not in by_date:
by_date[dt] = []
by_date[dt].append(value)
result = {}
for dt, values in by_date.items():
if aggregation == "sum":
result[dt] = sum(values)
elif aggregation == "mean":
result[dt] = calculate_mean(values)
elif aggregation == "max":
result[dt] = max(values)
elif aggregation == "min":
result[dt] = min(values)
else:
result[dt] = sum(values)
return result
def fill_missing_dates(
data: Dict[date, float],
start_date: date,
end_date: date,
fill_value: float = 0.0
) -> Dict[date, float]:
"""
Fill missing dates in time-series data.
Args:
data: Dictionary mapping date to value
start_date: Start date
end_date: End date
fill_value: Value to use for missing dates
Returns:
Dictionary with all dates filled
"""
date_range = generate_date_range(start_date, end_date)
filled_data = {}
for dt in date_range:
filled_data[dt] = data.get(dt, fill_value)
return filled_data
def calculate_trend(
values: List[float]
) -> Tuple[float, float]:
"""
Calculate linear trend (slope and intercept) using least squares.
Args:
values: List of values
Returns:
Tuple of (slope, intercept)
"""
if len(values) < 2:
return 0.0, values[0] if values else 0.0
n = len(values)
x = list(range(n))
y = values
# Calculate means
x_mean = sum(x) / n
y_mean = sum(y) / n
# Calculate slope
numerator = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
return 0.0, y_mean
slope = numerator / denominator
intercept = y_mean - slope * x_mean
return slope, intercept
def project_value(
historical_values: List[float],
periods_ahead: int,
method: str = "mean"
) -> List[float]:
"""
Project future values based on historical data.
Args:
historical_values: Historical values
periods_ahead: Number of periods to project
method: Projection method ('mean', 'trend', 'last')
Returns:
List of projected values
"""
if not historical_values:
return [0.0] * periods_ahead
if method == "mean":
# Use historical mean
projected_value = calculate_mean(historical_values)
return [projected_value] * periods_ahead
elif method == "last":
# Use last value
return [historical_values[-1]] * periods_ahead
elif method == "trend":
# Use trend projection
slope, intercept = calculate_trend(historical_values)
n = len(historical_values)
return [slope * (n + i) + intercept for i in range(periods_ahead)]
else:
# Default to mean
projected_value = calculate_mean(historical_values)
return [projected_value] * periods_ahead
def calculate_cumulative_sum(values: List[float]) -> List[float]:
"""
Calculate cumulative sum of values.
Args:
values: List of values
Returns:
List of cumulative sums
"""
cumulative = []
total = 0.0
for value in values:
total += value
cumulative.append(total)
return cumulative
def calculate_rolling_sum(
values: List[float],
window_size: int
) -> List[float]:
"""
Calculate rolling sum over a window.
Args:
values: List of values
window_size: Size of rolling window
Returns:
List of rolling sums
"""
if len(values) < window_size:
return []
rolling_sums = []
for i in range(len(values) - window_size + 1):
window = values[i:i + window_size]
rolling_sums.append(sum(window))
return rolling_sums
def normalize_values(
values: List[float],
method: str = "minmax"
) -> List[float]:
"""
Normalize values to a standard range.
Args:
values: List of values
method: Normalization method ('minmax' or 'zscore')
Returns:
List of normalized values
"""
if not values:
return []
if method == "minmax":
# Scale to [0, 1]
min_val = min(values)
max_val = max(values)
if max_val == min_val:
return [0.5] * len(values)
return [(v - min_val) / (max_val - min_val) for v in values]
elif method == "zscore":
# Z-score normalization
mean = calculate_mean(values)
stddev = calculate_standard_deviation(values)
if stddev == 0:
return [0.0] * len(values)
return [(v - mean) / stddev for v in values]
else:
return values
def detect_outliers(
values: List[float],
method: str = "iqr",
threshold: float = 1.5
) -> List[bool]:
"""
Detect outliers in values.
Args:
values: List of values
method: Detection method ('iqr' or 'zscore')
threshold: Threshold for outlier detection
Returns:
List of booleans indicating outliers
"""
if not values:
return []
if method == "iqr":
# Interquartile range method
q1 = calculate_percentile(values, 25)
q3 = calculate_percentile(values, 75)
iqr = q3 - q1
lower_bound = q1 - threshold * iqr
upper_bound = q3 + threshold * iqr
return [v < lower_bound or v > upper_bound for v in values]
elif method == "zscore":
# Z-score method
mean = calculate_mean(values)
stddev = calculate_standard_deviation(values)
if stddev == 0:
return [False] * len(values)
z_scores = [(v - mean) / stddev for v in values]
return [abs(z) > threshold for z in z_scores]
else:
return [False] * len(values)
def interpolate_missing_values(
values: List[Optional[float]],
method: str = "linear"
) -> List[float]:
"""
Interpolate missing values in a time series.
Args:
values: List of values with possible None values
method: Interpolation method ('linear', 'forward', 'backward')
Returns:
List with interpolated values
"""
if not values:
return []
result = []
if method == "forward":
# Forward fill
last_valid = None
for v in values:
if v is not None:
last_valid = v
result.append(last_valid if last_valid is not None else 0.0)
elif method == "backward":
# Backward fill
next_valid = None
for v in reversed(values):
if v is not None:
next_valid = v
result.insert(0, next_valid if next_valid is not None else 0.0)
else: # linear
# Linear interpolation
result = list(values)
for i in range(len(result)):
if result[i] is None:
# Find previous and next valid values
prev_idx = None
next_idx = None
for j in range(i - 1, -1, -1):
if values[j] is not None:
prev_idx = j
break
for j in range(i + 1, len(values)):
if values[j] is not None:
next_idx = j
break
if prev_idx is not None and next_idx is not None:
# Linear interpolation
x0, y0 = prev_idx, values[prev_idx]
x1, y1 = next_idx, values[next_idx]
result[i] = y0 + (y1 - y0) * (i - x0) / (x1 - x0)
elif prev_idx is not None:
# Forward fill
result[i] = values[prev_idx]
elif next_idx is not None:
# Backward fill
result[i] = values[next_idx]
else:
# No valid values
result[i] = 0.0
return result