Files
bakery-ia/services/training/app/services/date_alignment_service.py
2025-10-12 23:16:04 +02:00

239 lines
9.8 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
from app.utils.ml_datetime import ensure_timezone_aware
logger = logging.getLogger(__name__)
class DataSourceType(Enum):
BAKERY_SALES = "bakery_sales"
MADRID_TRAFFIC = "madrid_traffic"
WEATHER_FORECAST = "weather_forecast"
@dataclass
class DateRange:
start: datetime
end: datetime
source: DataSourceType
def duration_days(self) -> int:
return (self.end - self.start).days
def overlaps_with(self, other: 'DateRange') -> bool:
return self.start <= other.end and other.start <= self.end
@dataclass
class AlignedDateRange:
start: datetime
end: datetime
available_sources: List[DataSourceType]
constraints: Dict[str, str]
class DateAlignmentService:
"""
Central service for managing and aligning dates across multiple data sources
for the bakery sales prediction model.
"""
def __init__(self):
self.MAX_TRAINING_RANGE_DAYS = 730 # Maximum training data range
self.MIN_TRAINING_RANGE_DAYS = 30 # Minimum viable training data
def validate_and_align_dates(
self,
user_sales_range: DateRange,
requested_start: Optional[datetime] = None,
requested_end: Optional[datetime] = None
) -> AlignedDateRange:
"""
Main method to validate and align dates across all data sources.
Args:
user_sales_range: Date range of user-provided sales data
requested_start: Optional explicit start date for training
requested_end: Optional explicit end date for training
Returns:
AlignedDateRange with validated start/end dates and available sources
"""
try:
# Step 1: Determine the base date range
base_range = self._determine_base_range(
user_sales_range, requested_start, requested_end
)
# Step 2: Apply data source constraints
aligned_range = self._apply_data_source_constraints(base_range)
# Step 3: Validate final range
self._validate_final_range(aligned_range)
logger.info(f"Date alignment completed: {aligned_range.start} to {aligned_range.end}")
return aligned_range
except Exception as e:
logger.error(f"Date alignment failed: {str(e)}")
raise ValueError(f"Unable to align dates: {str(e)}")
def _determine_base_range(
self,
user_sales_range: DateRange,
requested_start: Optional[datetime],
requested_end: Optional[datetime]
) -> DateRange:
"""Determine the base date range for training."""
# Use explicit dates if provided
if requested_start and requested_end:
requested_start = ensure_timezone_aware(requested_start)
requested_end = ensure_timezone_aware(requested_end)
if requested_end <= requested_start:
raise ValueError("End date must be after start date")
return DateRange(requested_start, requested_end, DataSourceType.BAKERY_SALES)
# Otherwise, use the user's sales data range as the foundation
start_date = ensure_timezone_aware(requested_start or user_sales_range.start)
end_date = ensure_timezone_aware(requested_end or user_sales_range.end)
# Ensure we don't exceed maximum training range
if (end_date - start_date).days > self.MAX_TRAINING_RANGE_DAYS:
start_date = end_date - timedelta(days=self.MAX_TRAINING_RANGE_DAYS)
logger.warning(f"Limiting training range to {self.MAX_TRAINING_RANGE_DAYS} days")
return DateRange(start_date, end_date, DataSourceType.BAKERY_SALES)
def _apply_data_source_constraints(self, base_range: DateRange) -> AlignedDateRange:
"""Apply constraints from each data source and determine final aligned range."""
current_month = datetime.now(timezone.utc).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
available_sources = [DataSourceType.BAKERY_SALES] # Always have sales data
constraints = {}
# Madrid Traffic Data Constraint
madrid_end_date = self._get_madrid_traffic_end_date()
if base_range.end > madrid_end_date:
# If requested end date is in current month, adjust it
new_end = madrid_end_date
constraints["madrid_traffic"] = f"Adjusted end date to {new_end.date()} (latest available traffic data)"
logger.info(f"Madrid traffic constraint: end date adjusted to {new_end.date()}")
else:
new_end = base_range.end
available_sources.append(DataSourceType.MADRID_TRAFFIC)
# Weather Forecast Constraint
# Weather data available from yesterday backward
weather_end_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
if base_range.end > weather_end_date:
if new_end > weather_end_date:
new_end = weather_end_date
constraints["weather"] = f"Adjusted end date to {new_end.date()} (latest available weather data)"
logger.info(f"Weather constraint: end date adjusted to {new_end.date()}")
if new_end >= base_range.start:
available_sources.append(DataSourceType.WEATHER_FORECAST)
# Ensure minimum training period
final_start = base_range.start
if (new_end - final_start).days < self.MIN_TRAINING_RANGE_DAYS:
final_start = new_end - timedelta(days=self.MIN_TRAINING_RANGE_DAYS)
constraints["minimum_period"] = f"Adjusted start date to ensure {self.MIN_TRAINING_RANGE_DAYS} day minimum training period"
logger.info(f"Minimum period constraint: start date adjusted to {final_start.date()}")
return AlignedDateRange(
start=final_start,
end=new_end,
available_sources=available_sources,
constraints=constraints
)
def _get_madrid_traffic_end_date(self) -> datetime:
"""
Get the latest available date for Madrid traffic data.
Data for current month is not available until the following month.
"""
now = datetime.now(timezone.utc)
# Data up to the previous month is available
# Go to first day of current month, then subtract 1 day to get last day of previous month
last_day_of_previous_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
return last_day_of_previous_month
def _validate_final_range(self, aligned_range: AlignedDateRange) -> None:
"""Validate the final aligned date range."""
if aligned_range.start >= aligned_range.end:
raise ValueError("Invalid date range: start date must be before end date")
duration = (aligned_range.end - aligned_range.start).days
if duration < self.MIN_TRAINING_RANGE_DAYS:
raise ValueError(f"Insufficient training data: {duration} days (minimum: {self.MIN_TRAINING_RANGE_DAYS})")
if duration > self.MAX_TRAINING_RANGE_DAYS:
raise ValueError(f"Training period too long: {duration} days (maximum: {self.MAX_TRAINING_RANGE_DAYS})")
# Ensure we have at least sales data
if DataSourceType.BAKERY_SALES not in aligned_range.available_sources:
raise ValueError("No sales data available for the aligned date range")
def get_data_collection_plan(self, aligned_range: AlignedDateRange) -> Dict[str, Dict]:
"""
Generate a data collection plan based on the aligned date range.
Returns:
Dictionary with collection plans for each data source
"""
plan = {}
# Bakery Sales Data
if DataSourceType.BAKERY_SALES in aligned_range.available_sources:
plan["sales_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "user_upload",
"required": True
}
# Madrid Traffic Data
if DataSourceType.MADRID_TRAFFIC in aligned_range.available_sources:
plan["traffic_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "madrid_opendata",
"required": False,
"constraint": "Cannot request current month data"
}
# Weather Data
if DataSourceType.WEATHER_FORECAST in aligned_range.available_sources:
plan["weather_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "aemet_api",
"required": False,
"constraint": "Available from yesterday backward"
}
return plan
def check_madrid_current_month_constraint(self, end_date: datetime) -> bool:
"""
Check if the end date violates the Madrid Open Data current month constraint.
Args:
end_date: The requested end date
Returns:
True if the constraint is violated (end date is in current month)
"""
now = datetime.now(timezone.utc)
current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Debug logging
logger.info(f"🔍 Madrid constraint check: end_date={end_date}, current_month_start={current_month_start}, violation={end_date >= current_month_start}")
return end_date >= current_month_start