Files
bakery-ia/services/training/app/services/date_alignment_service.py
2025-09-08 21:52:56 +02:00

245 lines
10 KiB
Python

from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
class DataSourceType(Enum):
BAKERY_SALES = "bakery_sales"
MADRID_TRAFFIC = "madrid_traffic"
WEATHER_FORECAST = "weather_forecast"
@dataclass
class DateRange:
start: datetime
end: datetime
source: DataSourceType
def duration_days(self) -> int:
return (self.end - self.start).days
def overlaps_with(self, other: 'DateRange') -> bool:
return self.start <= other.end and other.start <= self.end
@dataclass
class AlignedDateRange:
start: datetime
end: datetime
available_sources: List[DataSourceType]
constraints: Dict[str, str]
class DateAlignmentService:
"""
Central service for managing and aligning dates across multiple data sources
for the bakery sales prediction model.
"""
def __init__(self):
self.MAX_TRAINING_RANGE_DAYS = 730 # Maximum training data range
self.MIN_TRAINING_RANGE_DAYS = 30 # Minimum viable training data
def validate_and_align_dates(
self,
user_sales_range: DateRange,
requested_start: Optional[datetime] = None,
requested_end: Optional[datetime] = None
) -> AlignedDateRange:
"""
Main method to validate and align dates across all data sources.
Args:
user_sales_range: Date range of user-provided sales data
requested_start: Optional explicit start date for training
requested_end: Optional explicit end date for training
Returns:
AlignedDateRange with validated start/end dates and available sources
"""
try:
# Step 1: Determine the base date range
base_range = self._determine_base_range(
user_sales_range, requested_start, requested_end
)
# Step 2: Apply data source constraints
aligned_range = self._apply_data_source_constraints(base_range)
# Step 3: Validate final range
self._validate_final_range(aligned_range)
logger.info(f"Date alignment completed: {aligned_range.start} to {aligned_range.end}")
return aligned_range
except Exception as e:
logger.error(f"Date alignment failed: {str(e)}")
raise ValueError(f"Unable to align dates: {str(e)}")
def _determine_base_range(
self,
user_sales_range: DateRange,
requested_start: Optional[datetime],
requested_end: Optional[datetime]
) -> DateRange:
"""Determine the base date range for training."""
# ✅ FIX: Ensure all datetimes are timezone-aware for comparison
def ensure_timezone_aware(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
# Use explicit dates if provided
if requested_start and requested_end:
requested_start = ensure_timezone_aware(requested_start)
requested_end = ensure_timezone_aware(requested_end)
if requested_end <= requested_start:
raise ValueError("End date must be after start date")
return DateRange(requested_start, requested_end, DataSourceType.BAKERY_SALES)
# Otherwise, use the user's sales data range as the foundation
start_date = ensure_timezone_aware(requested_start or user_sales_range.start)
end_date = ensure_timezone_aware(requested_end or user_sales_range.end)
# Ensure we don't exceed maximum training range
if (end_date - start_date).days > self.MAX_TRAINING_RANGE_DAYS:
start_date = end_date - timedelta(days=self.MAX_TRAINING_RANGE_DAYS)
logger.warning(f"Limiting training range to {self.MAX_TRAINING_RANGE_DAYS} days")
return DateRange(start_date, end_date, DataSourceType.BAKERY_SALES)
def _apply_data_source_constraints(self, base_range: DateRange) -> AlignedDateRange:
"""Apply constraints from each data source and determine final aligned range."""
current_month = datetime.now(timezone.utc).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
available_sources = [DataSourceType.BAKERY_SALES] # Always have sales data
constraints = {}
# Madrid Traffic Data Constraint
madrid_end_date = self._get_madrid_traffic_end_date()
if base_range.end > madrid_end_date:
# If requested end date is in current month, adjust it
new_end = madrid_end_date
constraints["madrid_traffic"] = f"Adjusted end date to {new_end.date()} (latest available traffic data)"
logger.info(f"Madrid traffic constraint: end date adjusted to {new_end.date()}")
else:
new_end = base_range.end
available_sources.append(DataSourceType.MADRID_TRAFFIC)
# Weather Forecast Constraint
# Weather data available from yesterday backward
weather_end_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
if base_range.end > weather_end_date:
if new_end > weather_end_date:
new_end = weather_end_date
constraints["weather"] = f"Adjusted end date to {new_end.date()} (latest available weather data)"
logger.info(f"Weather constraint: end date adjusted to {new_end.date()}")
if new_end >= base_range.start:
available_sources.append(DataSourceType.WEATHER_FORECAST)
# Ensure minimum training period
final_start = base_range.start
if (new_end - final_start).days < self.MIN_TRAINING_RANGE_DAYS:
final_start = new_end - timedelta(days=self.MIN_TRAINING_RANGE_DAYS)
constraints["minimum_period"] = f"Adjusted start date to ensure {self.MIN_TRAINING_RANGE_DAYS} day minimum training period"
logger.info(f"Minimum period constraint: start date adjusted to {final_start.date()}")
return AlignedDateRange(
start=final_start,
end=new_end,
available_sources=available_sources,
constraints=constraints
)
def _get_madrid_traffic_end_date(self) -> datetime:
"""
Get the latest available date for Madrid traffic data.
Data for current month is not available until the following month.
"""
now = datetime.now(timezone.utc)
# Data up to the previous month is available
# Go to first day of current month, then subtract 1 day to get last day of previous month
last_day_of_previous_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
return last_day_of_previous_month
def _validate_final_range(self, aligned_range: AlignedDateRange) -> None:
"""Validate the final aligned date range."""
if aligned_range.start >= aligned_range.end:
raise ValueError("Invalid date range: start date must be before end date")
duration = (aligned_range.end - aligned_range.start).days
if duration < self.MIN_TRAINING_RANGE_DAYS:
raise ValueError(f"Insufficient training data: {duration} days (minimum: {self.MIN_TRAINING_RANGE_DAYS})")
if duration > self.MAX_TRAINING_RANGE_DAYS:
raise ValueError(f"Training period too long: {duration} days (maximum: {self.MAX_TRAINING_RANGE_DAYS})")
# Ensure we have at least sales data
if DataSourceType.BAKERY_SALES not in aligned_range.available_sources:
raise ValueError("No sales data available for the aligned date range")
def get_data_collection_plan(self, aligned_range: AlignedDateRange) -> Dict[str, Dict]:
"""
Generate a data collection plan based on the aligned date range.
Returns:
Dictionary with collection plans for each data source
"""
plan = {}
# Bakery Sales Data
if DataSourceType.BAKERY_SALES in aligned_range.available_sources:
plan["sales_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "user_upload",
"required": True
}
# Madrid Traffic Data
if DataSourceType.MADRID_TRAFFIC in aligned_range.available_sources:
plan["traffic_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "madrid_opendata",
"required": False,
"constraint": "Cannot request current month data"
}
# Weather Data
if DataSourceType.WEATHER_FORECAST in aligned_range.available_sources:
plan["weather_data"] = {
"start_date": aligned_range.start,
"end_date": aligned_range.end,
"source": "aemet_api",
"required": False,
"constraint": "Available from yesterday backward"
}
return plan
def check_madrid_current_month_constraint(self, end_date: datetime) -> bool:
"""
Check if the end date violates the Madrid Open Data current month constraint.
Args:
end_date: The requested end date
Returns:
True if the constraint is violated (end date is in current month)
"""
now = datetime.now(timezone.utc)
current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Debug logging
logger.info(f"🔍 Madrid constraint check: end_date={end_date}, current_month_start={current_month_start}, violation={end_date >= current_month_start}")
return end_date >= current_month_start