2025-07-28 19:28:39 +02:00
# services/training/app/services/training_orchestrator.py
"""
Training Data Orchestrator - Enhanced Integration Layer
Orchestrates data collection , date alignment , and preparation for ML training
"""
from datetime import datetime , timedelta
from typing import Dict , List , Optional , Any , Tuple
from dataclasses import dataclass
import asyncio
2025-08-10 17:31:38 +02:00
import structlog
2025-07-28 19:28:39 +02:00
from concurrent . futures import ThreadPoolExecutor
2025-07-28 20:20:54 +02:00
from datetime import timezone
import pandas as pd
2025-07-28 19:28:39 +02:00
2025-07-29 15:08:55 +02:00
from app . services . data_client import DataClient
2025-07-28 19:28:39 +02:00
from app . services . date_alignment_service import DateAlignmentService , DateRange , DataSourceType , AlignedDateRange
2025-11-12 15:34:10 +01:00
from app . ml . poi_feature_integrator import POIFeatureIntegrator
2025-10-09 14:11:02 +02:00
from app . services . training_events import publish_training_failed
2025-07-29 21:33:57 +02:00
2025-08-10 17:31:38 +02:00
logger = structlog . get_logger ( )
2025-07-28 19:28:39 +02:00
@dataclass
class TrainingDataSet :
""" Container for all training data with metadata """
sales_data : List [ Dict [ str , Any ] ]
weather_data : List [ Dict [ str , Any ] ]
traffic_data : List [ Dict [ str , Any ] ]
2025-11-12 15:34:10 +01:00
poi_features : Dict [ str , Any ] # POI features for location-based forecasting
2025-07-28 19:28:39 +02:00
date_range : AlignedDateRange
metadata : Dict [ str , Any ]
class TrainingDataOrchestrator :
"""
Enhanced orchestrator for data collection from multiple sources .
Ensures date alignment , handles data source constraints , and prepares data for ML training .
2025-08-10 17:31:38 +02:00
Uses the new abstracted traffic service layer for multi - city support .
2025-07-28 19:28:39 +02:00
"""
2025-11-12 15:34:10 +01:00
def __init__ ( self ,
date_alignment_service : DateAlignmentService = None ,
poi_feature_integrator : POIFeatureIntegrator = None ) :
2025-07-29 15:08:55 +02:00
self . data_client = DataClient ( )
2025-07-28 19:28:39 +02:00
self . date_alignment_service = date_alignment_service or DateAlignmentService ( )
2025-11-12 15:34:10 +01:00
self . poi_feature_integrator = poi_feature_integrator or POIFeatureIntegrator ( )
2025-08-10 17:31:38 +02:00
self . max_concurrent_requests = 5 # Increased for better performance
2025-07-28 19:28:39 +02:00
async def prepare_training_data (
self ,
tenant_id : str ,
bakery_location : Tuple [ float , float ] , # (lat, lon)
requested_start : Optional [ datetime ] = None ,
requested_end : Optional [ datetime ] = None ,
job_id : Optional [ str ] = None
) - > TrainingDataSet :
"""
Main method to prepare all training data with comprehensive date alignment .
Args :
tenant_id : Tenant identifier
sales_data : User - provided sales data
bakery_location : Bakery coordinates ( lat , lon )
requested_start : Optional explicit start date
requested_end : Optional explicit end date
job_id : Training job identifier for logging
Returns :
TrainingDataSet with all aligned and validated data
"""
logger . info ( f " Starting comprehensive training data preparation for tenant { tenant_id } , job { job_id } " )
try :
2025-08-17 10:28:58 +02:00
# Step 1: Fetch and validate sales data (unified approach)
sales_data = await self . data_client . fetch_sales_data ( tenant_id , fetch_all = True )
2025-07-28 19:28:39 +02:00
2025-08-17 10:28:58 +02:00
if not sales_data or len ( sales_data ) == 0 :
error_msg = f " No sales data available for tenant { tenant_id } . Please import sales data before starting training. "
logger . error ( " Training aborted - no sales data " , tenant_id = tenant_id , job_id = job_id )
raise ValueError ( error_msg )
2025-07-28 19:28:39 +02:00
2025-08-17 10:28:58 +02:00
# Debug: Analyze the sales data structure to understand product distribution
sales_df_debug = pd . DataFrame ( sales_data )
if ' inventory_product_id ' in sales_df_debug . columns :
unique_products_found = sales_df_debug [ ' inventory_product_id ' ] . unique ( )
product_counts = sales_df_debug [ ' inventory_product_id ' ] . value_counts ( ) . to_dict ( )
logger . info ( " Sales data analysis (moved from pre-flight) " ,
tenant_id = tenant_id ,
job_id = job_id ,
total_sales_records = len ( sales_data ) ,
unique_products_count = len ( unique_products_found ) ,
unique_products = unique_products_found . tolist ( ) ,
records_per_product = product_counts )
if len ( unique_products_found ) == 1 :
logger . warning ( " POTENTIAL ISSUE: Only ONE unique product found in all sales data " ,
tenant_id = tenant_id ,
single_product = unique_products_found [ 0 ] ,
record_count = len ( sales_data ) )
else :
logger . warning ( " No ' inventory_product_id ' column found in sales data " ,
tenant_id = tenant_id ,
columns = list ( sales_df_debug . columns ) )
logger . info ( f " Sales data validation passed: { len ( sales_data ) } sales records found " ,
tenant_id = tenant_id , job_id = job_id )
# Step 2: Extract and validate sales data date range
2025-07-28 19:28:39 +02:00
sales_date_range = self . _extract_sales_date_range ( sales_data )
logger . info ( f " Sales data range detected: { sales_date_range . start } to { sales_date_range . end } " )
2025-08-17 10:28:58 +02:00
# Step 3: Apply date alignment across all data sources
2025-07-28 19:28:39 +02:00
aligned_range = self . date_alignment_service . validate_and_align_dates (
user_sales_range = sales_date_range ,
requested_start = requested_start ,
requested_end = requested_end
)
logger . info ( f " Date alignment completed: { aligned_range . start } to { aligned_range . end } " )
if aligned_range . constraints :
logger . info ( f " Applied constraints: { aligned_range . constraints } " )
2025-08-17 10:28:58 +02:00
# Step 4: Filter sales data to aligned date range
2025-07-28 19:28:39 +02:00
filtered_sales = self . _filter_sales_data ( sales_data , aligned_range )
2025-08-17 10:28:58 +02:00
# Step 5: Collect external data sources concurrently
2025-07-28 19:28:39 +02:00
logger . info ( " Collecting external data sources... " )
2025-11-12 15:34:10 +01:00
weather_data , traffic_data , poi_features = await self . _collect_external_data (
2025-07-28 20:20:54 +02:00
aligned_range , bakery_location , tenant_id
2025-07-28 19:28:39 +02:00
)
2025-11-12 15:34:10 +01:00
2025-08-17 10:28:58 +02:00
# Step 6: Validate data quality
2025-07-28 19:28:39 +02:00
data_quality_results = self . _validate_data_sources (
filtered_sales , weather_data , traffic_data , aligned_range
)
2025-11-12 15:34:10 +01:00
2025-08-17 10:28:58 +02:00
# Step 7: Create comprehensive training dataset
2025-07-28 19:28:39 +02:00
training_dataset = TrainingDataSet (
sales_data = filtered_sales ,
weather_data = weather_data ,
traffic_data = traffic_data ,
2025-11-12 15:34:10 +01:00
poi_features = poi_features or { } , # POI features (static, location-based)
2025-07-28 19:28:39 +02:00
date_range = aligned_range ,
metadata = {
" tenant_id " : tenant_id ,
" job_id " : job_id ,
" bakery_location " : bakery_location ,
" data_sources_used " : aligned_range . available_sources ,
" constraints_applied " : aligned_range . constraints ,
" data_quality " : data_quality_results ,
" preparation_timestamp " : datetime . now ( ) . isoformat ( ) ,
" original_sales_range " : {
" start " : sales_date_range . start . isoformat ( ) ,
" end " : sales_date_range . end . isoformat ( )
2025-11-12 15:34:10 +01:00
} ,
" poi_features_count " : len ( poi_features ) if poi_features else 0
2025-07-28 19:28:39 +02:00
}
)
2025-08-17 10:28:58 +02:00
# Step 8: Final validation
2025-07-28 19:28:39 +02:00
final_validation = self . validate_training_data_quality ( training_dataset )
training_dataset . metadata [ " final_validation " ] = final_validation
logger . info ( f " Training data preparation completed successfully: " )
logger . info ( f " - Sales records: { len ( filtered_sales ) } " )
logger . info ( f " - Weather records: { len ( weather_data ) } " )
logger . info ( f " - Traffic records: { len ( traffic_data ) } " )
2025-11-12 15:34:10 +01:00
logger . info ( f " - POI features: { len ( poi_features ) if poi_features else 0 } " )
2025-07-28 19:28:39 +02:00
logger . info ( f " - Data quality score: { final_validation . get ( ' data_quality_score ' , ' N/A ' ) } " )
return training_dataset
except Exception as e :
2025-10-09 14:11:02 +02:00
if job_id and tenant_id :
await publish_training_failed ( job_id , tenant_id , str ( e ) )
2025-07-28 19:28:39 +02:00
logger . error ( f " Training data preparation failed: { str ( e ) } " )
raise ValueError ( f " Failed to prepare training data: { str ( e ) } " )
2025-11-05 13:34:56 +01:00
@staticmethod
2025-08-17 13:35:05 +02:00
def extract_sales_date_range_utc_localize ( sales_data_df : pd . DataFrame ) :
"""
Extracts the UTC - aware date range from a sales DataFrame using tz_localize .
Args :
sales_data_df : A pandas DataFrame containing a ' date ' column .
Returns :
A tuple of timezone - aware start and end dates in UTC .
"""
if ' date ' not in sales_data_df . columns :
raise ValueError ( " DataFrame does not contain a ' date ' column. " )
# Convert the 'date' column to datetime objects
sales_data_df [ ' date ' ] = pd . to_datetime ( sales_data_df [ ' date ' ] )
# Localize the naive datetime objects to UTC
sales_data_df [ ' date ' ] = sales_data_df [ ' date ' ] . tz_localize ( ' UTC ' )
# Find the minimum and maximum dates
start_date = sales_data_df [ ' date ' ] . min ( )
end_date = sales_data_df [ ' date ' ] . max ( )
return DateRange ( start_date , end_date , DataSourceType . BAKERY_SALES )
def _extract_sales_date_range ( self , sales_data : List [ Dict [ str , Any ] ] ) - > ' DateRange ' :
"""
Extract date range from sales data with proper date parsing
Args :
sales_data : List of sales records
Returns :
DateRange object with timezone - aware start and end dates
"""
2025-07-28 19:28:39 +02:00
if not sales_data :
2025-08-17 13:35:05 +02:00
raise ValueError ( " No sales data provided for date range extraction " )
2025-07-28 19:28:39 +02:00
2025-08-17 13:35:05 +02:00
# Convert to DataFrame for easier processing
sales_df = pd . DataFrame ( sales_data )
2025-08-17 11:12:17 +02:00
2025-08-17 13:35:05 +02:00
if ' date ' not in sales_df . columns :
raise ValueError ( " Sales data does not contain a ' date ' column " )
# Convert dates to datetime with proper parsing
# This will use the improved date parsing from the data import service
sales_df [ ' date ' ] = pd . to_datetime ( sales_df [ ' date ' ] , utc = True , errors = ' coerce ' )
2025-07-28 19:28:39 +02:00
2025-08-17 13:35:05 +02:00
# Remove any rows with invalid dates
sales_df = sales_df . dropna ( subset = [ ' date ' ] )
if len ( sales_df ) == 0 :
2025-07-28 19:28:39 +02:00
raise ValueError ( " No valid dates found in sales data " )
2025-08-17 13:35:05 +02:00
# Find the minimum and maximum dates
start_date = sales_df [ ' date ' ] . min ( )
end_date = sales_df [ ' date ' ] . max ( )
logger . info ( f " Extracted sales date range: { start_date } to { end_date } " )
2025-07-28 19:28:39 +02:00
2025-07-28 20:20:54 +02:00
return DateRange ( start_date , end_date , DataSourceType . BAKERY_SALES )
2025-07-28 19:28:39 +02:00
def _filter_sales_data (
self ,
sales_data : List [ Dict [ str , Any ] ] ,
aligned_range : AlignedDateRange
) - > List [ Dict [ str , Any ] ] :
""" Filter sales data to the aligned date range with enhanced validation """
filtered_data = [ ]
filtered_count = 0
for record in sales_data :
try :
if ' date ' in record :
record_date = record [ ' date ' ]
2025-07-28 20:20:54 +02:00
2025-11-05 13:34:56 +01:00
# ✅ FIX: Proper timezone handling for date parsing - FIXED THE TRUNCATION ISSUE
2025-07-28 19:28:39 +02:00
if isinstance ( record_date , str ) :
2025-11-05 13:34:56 +01:00
# Parse complete ISO datetime string with timezone info intact
# DO NOT truncate to date part only - this was causing the filtering issue
2025-07-28 19:28:39 +02:00
if ' T ' in record_date :
record_date = record_date . replace ( ' Z ' , ' +00:00 ' )
2025-11-05 13:34:56 +01:00
# Parse with FULL datetime info, not just date part
parsed_date = datetime . fromisoformat ( record_date )
2025-07-28 20:20:54 +02:00
# Ensure timezone-aware
if parsed_date . tzinfo is None :
parsed_date = parsed_date . replace ( tzinfo = timezone . utc )
record_date = parsed_date
2025-07-28 19:28:39 +02:00
elif isinstance ( record_date , datetime ) :
2025-07-28 20:20:54 +02:00
# Ensure timezone-aware
if record_date . tzinfo is None :
record_date = record_date . replace ( tzinfo = timezone . utc )
2025-11-05 13:34:56 +01:00
# DO NOT normalize to start of day - keep actual datetime for proper filtering
# Only normalize if needed for daily aggregation, but preserve original for filtering
2025-07-28 19:28:39 +02:00
2025-07-28 20:20:54 +02:00
# ✅ FIX: Ensure aligned_range dates are also timezone-aware for comparison
aligned_start = aligned_range . start
aligned_end = aligned_range . end
if aligned_start . tzinfo is None :
aligned_start = aligned_start . replace ( tzinfo = timezone . utc )
if aligned_end . tzinfo is None :
aligned_end = aligned_end . replace ( tzinfo = timezone . utc )
# Check if date falls within aligned range (now both are timezone-aware)
if aligned_start < = record_date < = aligned_end :
2025-07-28 19:28:39 +02:00
# Validate that record has required fields
if self . _validate_sales_record ( record ) :
filtered_data . append ( record )
else :
filtered_count + = 1
2025-07-28 20:20:54 +02:00
else :
# Record outside date range
filtered_count + = 1
2025-07-28 19:28:39 +02:00
except Exception as e :
logger . warning ( f " Error processing sales record: { str ( e ) } " )
filtered_count + = 1
continue
logger . info ( f " Filtered sales data: { len ( filtered_data ) } records in aligned range " )
if filtered_count > 0 :
logger . warning ( f " Filtered out { filtered_count } invalid records " )
return filtered_data
def _validate_sales_record ( self , record : Dict [ str , Any ] ) - > bool :
""" Validate individual sales record """
2025-08-14 16:47:34 +02:00
required_fields = [ ' date ' , ' inventory_product_id ' ]
2025-07-28 19:28:39 +02:00
quantity_fields = [ ' quantity ' , ' quantity_sold ' , ' sales ' , ' units_sold ' ]
# Check required fields
for field in required_fields :
if field not in record or record [ field ] is None :
return False
# Check at least one quantity field exists
has_quantity = any ( field in record and record [ field ] is not None for field in quantity_fields )
if not has_quantity :
return False
# Validate quantity is numeric and non-negative
for field in quantity_fields :
if field in record and record [ field ] is not None :
try :
quantity = float ( record [ field ] )
if quantity < 0 :
return False
except ( ValueError , TypeError ) :
return False
break
return True
async def _collect_external_data (
self ,
aligned_range : AlignedDateRange ,
2025-07-28 20:20:54 +02:00
bakery_location : Tuple [ float , float ] ,
tenant_id : str
2025-11-12 15:34:10 +01:00
) - > Tuple [ List [ Dict [ str , Any ] ] , List [ Dict [ str , Any ] ] , Dict [ str , Any ] ] :
""" Collect weather, traffic, and POI data concurrently with enhanced error handling """
2025-07-28 19:28:39 +02:00
lat , lon = bakery_location
2025-11-12 15:34:10 +01:00
2025-07-28 19:28:39 +02:00
# Create collection tasks with timeout
tasks = [ ]
2025-11-12 15:34:10 +01:00
2025-07-28 19:28:39 +02:00
# Weather data collection
if DataSourceType . WEATHER_FORECAST in aligned_range . available_sources :
weather_task = asyncio . create_task (
2025-07-28 20:20:54 +02:00
self . _collect_weather_data_with_timeout ( lat , lon , aligned_range , tenant_id )
2025-07-28 19:28:39 +02:00
)
tasks . append ( ( " weather " , weather_task ) )
2025-11-12 15:34:10 +01:00
2025-08-10 17:31:38 +02:00
# Enhanced Traffic data collection (supports multiple cities)
2025-07-28 19:28:39 +02:00
if DataSourceType . MADRID_TRAFFIC in aligned_range . available_sources :
2025-08-10 17:31:38 +02:00
logger . info ( f " 🚛 Traffic data source available for multiple cities, creating collection task for date range: { aligned_range . start } to { aligned_range . end } " )
2025-07-28 19:28:39 +02:00
traffic_task = asyncio . create_task (
2025-08-10 17:31:38 +02:00
self . _collect_traffic_data_with_timeout_enhanced ( lat , lon , aligned_range , tenant_id )
2025-07-28 19:28:39 +02:00
)
tasks . append ( ( " traffic " , traffic_task ) )
2025-08-08 23:06:54 +02:00
else :
logger . warning ( f " 🚫 Traffic data source NOT available in sources: { [ s . value for s in aligned_range . available_sources ] } " )
2025-11-12 15:34:10 +01:00
# POI features collection (static, location-based)
poi_task = asyncio . create_task (
self . _collect_poi_features ( lat , lon , tenant_id )
)
tasks . append ( ( " poi " , poi_task ) )
2025-07-28 19:28:39 +02:00
# Execute tasks concurrently with proper error handling
results = { }
if tasks :
try :
completed_tasks = await asyncio . gather (
* [ task for _ , task in tasks ] ,
return_exceptions = True
)
2025-11-12 15:34:10 +01:00
2025-07-28 19:28:39 +02:00
for i , ( task_name , _ ) in enumerate ( tasks ) :
result = completed_tasks [ i ]
if isinstance ( result , Exception ) :
logger . warning ( f " { task_name } data collection failed: { result } " )
2025-11-12 15:34:10 +01:00
results [ task_name ] = [ ] if task_name != " poi " else { }
2025-07-28 19:28:39 +02:00
else :
results [ task_name ] = result
2025-11-12 15:34:10 +01:00
if task_name == " poi " :
logger . info ( f " { task_name } features collected: { len ( result ) if result else 0 } features " )
else :
logger . info ( f " { task_name } data collection completed: { len ( result ) } records " )
2025-07-28 19:28:39 +02:00
except Exception as e :
logger . error ( f " Error in concurrent data collection: { str ( e ) } " )
2025-11-12 15:34:10 +01:00
results = { " weather " : [ ] , " traffic " : [ ] , " poi " : { } }
2025-07-28 19:28:39 +02:00
weather_data = results . get ( " weather " , [ ] )
traffic_data = results . get ( " traffic " , [ ] )
2025-11-12 15:34:10 +01:00
poi_features = results . get ( " poi " , { } )
return weather_data , traffic_data , poi_features
async def _collect_poi_features (
self ,
lat : float ,
lon : float ,
tenant_id : str
) - > Dict [ str , Any ] :
"""
Collect POI features for bakery location .
POI features are static ( location - based , not time - varying ) .
"""
try :
logger . info (
" Collecting POI features " ,
tenant_id = tenant_id ,
location = ( lat , lon )
)
poi_features = await self . poi_feature_integrator . fetch_poi_features (
tenant_id = tenant_id ,
latitude = lat ,
longitude = lon ,
force_refresh = False
)
if poi_features :
logger . info (
" POI features collected successfully " ,
tenant_id = tenant_id ,
feature_count = len ( poi_features )
)
else :
logger . warning (
" No POI features collected (service may be unavailable) " ,
tenant_id = tenant_id
)
return poi_features or { }
except Exception as e :
logger . error (
" Failed to collect POI features, continuing without them " ,
tenant_id = tenant_id ,
error = str ( e ) ,
exc_info = True
)
return { }
2025-07-28 19:28:39 +02:00
async def _collect_weather_data_with_timeout (
self ,
lat : float ,
lon : float ,
2025-07-28 20:20:54 +02:00
aligned_range : AlignedDateRange ,
tenant_id : str
2025-07-28 19:28:39 +02:00
) - > List [ Dict [ str , Any ] ] :
""" Collect weather data with timeout and fallback """
try :
2025-07-28 20:20:54 +02:00
start_date_str = aligned_range . start . isoformat ( )
end_date_str = aligned_range . end . isoformat ( )
2025-07-28 19:28:39 +02:00
2025-07-28 20:20:54 +02:00
weather_data = await self . data_client . fetch_weather_data (
tenant_id = tenant_id ,
start_date = start_date_str ,
end_date = end_date_str ,
latitude = lat ,
longitude = lon )
2025-07-28 19:28:39 +02:00
# Validate weather data
if self . _validate_weather_data ( weather_data ) :
logger . info ( f " Collected { len ( weather_data ) } valid weather records " )
return weather_data
else :
logger . warning ( " Invalid weather data received, using synthetic data " )
return self . _generate_synthetic_weather_data ( aligned_range )
except asyncio . TimeoutError :
2025-07-28 20:20:54 +02:00
logger . warning ( f " Weather data collection timed out, using synthetic data " )
2025-07-28 19:28:39 +02:00
return self . _generate_synthetic_weather_data ( aligned_range )
except Exception as e :
logger . warning ( f " Weather data collection failed: { e } , using synthetic data " )
return self . _generate_synthetic_weather_data ( aligned_range )
2025-08-10 17:31:38 +02:00
async def _collect_traffic_data_with_timeout_enhanced (
2025-07-28 19:28:39 +02:00
self ,
lat : float ,
lon : float ,
2025-07-28 20:20:54 +02:00
aligned_range : AlignedDateRange ,
tenant_id : str
2025-07-28 19:28:39 +02:00
) - > List [ Dict [ str , Any ] ] :
2025-08-10 17:31:38 +02:00
"""
Enhanced traffic data collection with multi - city support and improved storage
Uses the new abstracted traffic service layer
"""
2025-07-28 19:28:39 +02:00
try :
2025-08-10 17:31:38 +02:00
# Double-check constraints before making request
2025-08-08 23:06:54 +02:00
constraint_violated = self . date_alignment_service . check_madrid_current_month_constraint ( aligned_range . end )
if constraint_violated :
2025-08-10 17:31:38 +02:00
logger . warning ( f " 🚫 Current month constraint violation: end_date= { aligned_range . end } , no traffic data available " )
2025-07-28 19:28:39 +02:00
return [ ]
2025-08-08 23:06:54 +02:00
else :
2025-08-10 17:31:38 +02:00
logger . info ( f " ✅ Date constraints passed: end_date= { aligned_range . end } , proceeding with traffic data request " )
2025-07-28 19:28:39 +02:00
2025-07-28 20:20:54 +02:00
start_date_str = aligned_range . start . isoformat ( )
end_date_str = aligned_range . end . isoformat ( )
2025-08-17 10:28:58 +02:00
# Enhanced: Fetch traffic data using unified cache-first method
2025-08-10 17:31:38 +02:00
# This automatically detects the appropriate city and uses the right client
2025-08-17 10:28:58 +02:00
traffic_data = await self . data_client . fetch_traffic_data_unified (
2025-07-28 20:20:54 +02:00
tenant_id = tenant_id ,
start_date = start_date_str ,
end_date = end_date_str ,
latitude = lat ,
2025-08-17 10:28:58 +02:00
longitude = lon ,
force_refresh = False # Use cache-first strategy
)
2025-07-28 20:20:54 +02:00
2025-08-10 17:31:38 +02:00
# Enhanced validation including pedestrian inference data
if self . _validate_traffic_data_enhanced ( traffic_data ) :
logger . info ( f " Collected and stored { len ( traffic_data ) } valid enhanced traffic records for re-training " )
2025-08-08 23:29:48 +02:00
2025-08-10 17:31:38 +02:00
# Log storage success with enhanced metadata
self . _log_enhanced_traffic_data_storage ( lat , lon , aligned_range , len ( traffic_data ) , traffic_data )
2025-08-08 23:29:48 +02:00
2025-07-28 19:28:39 +02:00
return traffic_data
else :
2025-08-10 17:31:38 +02:00
logger . warning ( " Invalid enhanced traffic data received " )
2025-07-28 19:28:39 +02:00
return [ ]
except asyncio . TimeoutError :
2025-08-10 17:31:38 +02:00
logger . warning ( f " Enhanced traffic data collection timed out " )
2025-07-28 19:28:39 +02:00
return [ ]
except Exception as e :
2025-08-10 17:31:38 +02:00
logger . warning ( f " Enhanced traffic data collection failed: { e } " )
2025-07-28 19:28:39 +02:00
return [ ]
2025-10-09 14:11:02 +02:00
def _log_enhanced_traffic_data_storage ( self ,
lat : float ,
lon : float ,
aligned_range : AlignedDateRange ,
2025-08-10 17:31:38 +02:00
record_count : int ,
traffic_data : List [ Dict [ str , Any ] ] ) :
""" Enhanced logging for traffic data storage with detailed metadata """
cities_detected = set ( )
has_pedestrian_data = 0
data_sources = set ( )
districts_covered = set ( )
2025-10-09 14:11:02 +02:00
2025-08-10 17:31:38 +02:00
for record in traffic_data :
if ' city ' in record and record [ ' city ' ] :
cities_detected . add ( record [ ' city ' ] )
if ' pedestrian_count ' in record and record [ ' pedestrian_count ' ] is not None :
has_pedestrian_data + = 1
if ' source ' in record and record [ ' source ' ] :
data_sources . add ( record [ ' source ' ] )
if ' district ' in record and record [ ' district ' ] :
districts_covered . add ( record [ ' district ' ] )
2025-10-09 14:11:02 +02:00
2025-08-08 23:29:48 +02:00
logger . info (
2025-08-10 17:31:38 +02:00
" Enhanced traffic data stored for re-training " ,
2025-08-08 23:29:48 +02:00
location = f " { lat : .4f } , { lon : .4f } " ,
date_range = f " { aligned_range . start . isoformat ( ) } to { aligned_range . end . isoformat ( ) } " ,
records_stored = record_count ,
2025-08-10 17:31:38 +02:00
cities_detected = list ( cities_detected ) ,
pedestrian_inference_coverage = f " { has_pedestrian_data } / { record_count } " ,
data_sources = list ( data_sources ) ,
districts_covered = list ( districts_covered ) ,
2025-08-08 23:29:48 +02:00
storage_timestamp = datetime . now ( ) . isoformat ( ) ,
2025-10-09 14:11:02 +02:00
purpose = " model_training_and_retraining "
2025-08-08 23:29:48 +02:00
)
2025-07-28 19:28:39 +02:00
def _validate_weather_data ( self , weather_data : List [ Dict [ str , Any ] ] ) - > bool :
""" Validate weather data quality """
if not weather_data :
return False
required_fields = [ ' date ' ]
weather_fields = [ ' temperature ' , ' temp ' , ' temperatura ' , ' precipitation ' , ' rain ' , ' lluvia ' ]
valid_records = 0
for record in weather_data :
# Check required fields
if not all ( field in record for field in required_fields ) :
continue
# Check at least one weather field exists
if any ( field in record and record [ field ] is not None for field in weather_fields ) :
valid_records + = 1
# Consider valid if at least 50% of records are valid
validity_threshold = 0.5
is_valid = ( valid_records / len ( weather_data ) ) > = validity_threshold
if not is_valid :
logger . warning ( f " Weather data validation failed: { valid_records } / { len ( weather_data ) } valid records " )
return is_valid
2025-08-10 17:31:38 +02:00
def _validate_traffic_data_enhanced ( self , traffic_data : List [ Dict [ str , Any ] ] ) - > bool :
""" Enhanced validation for traffic data including pedestrian inference and city-specific fields """
2025-07-28 19:28:39 +02:00
if not traffic_data :
return False
required_fields = [ ' date ' ]
traffic_fields = [ ' traffic_volume ' , ' traffic_intensity ' , ' intensidad ' , ' trafico ' ]
2025-08-10 17:31:38 +02:00
enhanced_fields = [ ' pedestrian_count ' , ' congestion_level ' , ' source ' ]
city_specific_fields = [ ' city ' , ' measurement_point_id ' , ' district ' ]
2025-07-28 19:28:39 +02:00
valid_records = 0
2025-08-10 17:31:38 +02:00
enhanced_records = 0
city_aware_records = 0
2025-07-28 19:28:39 +02:00
for record in traffic_data :
2025-08-10 17:31:38 +02:00
record_score = 0
2025-07-28 19:28:39 +02:00
# Check required fields
2025-08-10 17:31:38 +02:00
if all ( field in record and record [ field ] is not None for field in required_fields ) :
record_score + = 1
2025-07-28 19:28:39 +02:00
2025-08-10 17:31:38 +02:00
# Check traffic data fields
2025-07-28 19:28:39 +02:00
if any ( field in record and record [ field ] is not None for field in traffic_fields ) :
2025-08-10 17:31:38 +02:00
record_score + = 1
# Check enhanced fields (pedestrian inference, etc.)
enhanced_count = sum ( 1 for field in enhanced_fields
if field in record and record [ field ] is not None )
if enhanced_count > = 2 : # At least 2 enhanced fields
enhanced_records + = 1
record_score + = 1
# Check city-specific awareness
city_count = sum ( 1 for field in city_specific_fields
if field in record and record [ field ] is not None )
if city_count > = 1 : # At least some city awareness
city_aware_records + = 1
2025-08-15 17:53:59 +02:00
# Record is valid if it has basic requirements (date + any traffic field)
# Lowered requirement from >= 2 to >= 1 to accept records with just date or traffic data
if record_score > = 1 :
2025-07-28 19:28:39 +02:00
valid_records + = 1
2025-08-10 17:31:38 +02:00
total_records = len ( traffic_data )
2025-08-15 17:53:59 +02:00
validity_threshold = 0.1 # Reduced from 0.3 to 0.1 - accept if 10% of records are valid
enhancement_threshold = 0.1 # Reduced threshold for enhanced features
2025-07-28 19:28:39 +02:00
2025-08-10 17:31:38 +02:00
basic_validity = ( valid_records / total_records ) > = validity_threshold
has_enhancements = ( enhanced_records / total_records ) > = enhancement_threshold
has_city_awareness = ( city_aware_records / total_records ) > = enhancement_threshold
2025-07-28 19:28:39 +02:00
2025-08-10 17:31:38 +02:00
logger . info ( " Enhanced traffic data validation results " ,
total_records = total_records ,
valid_records = valid_records ,
enhanced_records = enhanced_records ,
city_aware_records = city_aware_records ,
basic_validity = basic_validity ,
has_enhancements = has_enhancements ,
has_city_awareness = has_city_awareness )
if not basic_validity :
logger . warning ( f " Traffic data basic validation failed: { valid_records } / { total_records } valid records " )
return basic_validity
def _validate_traffic_data ( self , traffic_data : List [ Dict [ str , Any ] ] ) - > bool :
""" Legacy validation method - redirects to enhanced version """
return self . _validate_traffic_data_enhanced ( traffic_data )
2025-07-28 19:28:39 +02:00
def _validate_data_sources (
self ,
sales_data : List [ Dict [ str , Any ] ] ,
weather_data : List [ Dict [ str , Any ] ] ,
traffic_data : List [ Dict [ str , Any ] ] ,
aligned_range : AlignedDateRange
) - > Dict [ str , Any ] :
""" Validate all data sources and provide quality metrics """
validation_results = {
" sales_data " : {
" record_count " : len ( sales_data ) ,
" is_valid " : len ( sales_data ) > 0 ,
" coverage_days " : ( aligned_range . end - aligned_range . start ) . days ,
" quality_score " : 0.0
} ,
" weather_data " : {
" record_count " : len ( weather_data ) ,
" is_valid " : self . _validate_weather_data ( weather_data ) if weather_data else False ,
" quality_score " : 0.0
} ,
" traffic_data " : {
" record_count " : len ( traffic_data ) ,
" is_valid " : self . _validate_traffic_data ( traffic_data ) if traffic_data else False ,
" quality_score " : 0.0
} ,
" overall_quality_score " : 0.0
}
# Calculate quality scores
# Sales data quality (most important)
if validation_results [ " sales_data " ] [ " record_count " ] > 0 :
coverage_ratio = min ( 1.0 , validation_results [ " sales_data " ] [ " record_count " ] / validation_results [ " sales_data " ] [ " coverage_days " ] )
validation_results [ " sales_data " ] [ " quality_score " ] = coverage_ratio * 100
# Weather data quality
if validation_results [ " weather_data " ] [ " record_count " ] > 0 :
expected_weather_records = ( aligned_range . end - aligned_range . start ) . days
coverage_ratio = min ( 1.0 , validation_results [ " weather_data " ] [ " record_count " ] / expected_weather_records )
validation_results [ " weather_data " ] [ " quality_score " ] = coverage_ratio * 100
# Traffic data quality
if validation_results [ " traffic_data " ] [ " record_count " ] > 0 :
expected_traffic_records = ( aligned_range . end - aligned_range . start ) . days
coverage_ratio = min ( 1.0 , validation_results [ " traffic_data " ] [ " record_count " ] / expected_traffic_records )
validation_results [ " traffic_data " ] [ " quality_score " ] = coverage_ratio * 100
# Overall quality score (weighted by importance)
weights = { " sales_data " : 0.7 , " weather_data " : 0.2 , " traffic_data " : 0.1 }
overall_score = sum (
validation_results [ source ] [ " quality_score " ] * weight
for source , weight in weights . items ( )
)
validation_results [ " overall_quality_score " ] = round ( overall_score , 2 )
return validation_results
def _generate_synthetic_weather_data (
self ,
aligned_range : AlignedDateRange
) - > List [ Dict [ str , Any ] ] :
""" Generate realistic synthetic weather data for Madrid """
synthetic_data = [ ]
current_date = aligned_range . start
# Madrid seasonal temperature patterns
seasonal_temps = {
1 : 9 , 2 : 11 , 3 : 15 , 4 : 17 , 5 : 21 , 6 : 26 ,
7 : 29 , 8 : 28 , 9 : 24 , 10 : 18 , 11 : 12 , 12 : 9
}
while current_date < = aligned_range . end :
month = current_date . month
base_temp = seasonal_temps . get ( month , 15 )
# Add some realistic variation
import random
temp_variation = random . gauss ( 0 , 3 ) # ±3°C variation
temperature = max ( 0 , base_temp + temp_variation )
# Precipitation patterns (Madrid is relatively dry)
precipitation = 0.0
if random . random ( ) < 0.15 : # 15% chance of rain
precipitation = random . uniform ( 0.1 , 15.0 )
synthetic_data . append ( {
" date " : current_date ,
" temperature " : round ( temperature , 1 ) ,
" precipitation " : round ( precipitation , 1 ) ,
" humidity " : round ( random . uniform ( 40 , 80 ) , 1 ) ,
" wind_speed " : round ( random . uniform ( 2 , 15 ) , 1 ) ,
" pressure " : round ( random . uniform ( 1005 , 1025 ) , 1 ) ,
" source " : " synthetic_madrid_pattern "
} )
current_date = current_date + timedelta ( days = 1 )
logger . info ( f " Generated { len ( synthetic_data ) } synthetic weather records with Madrid patterns " )
return synthetic_data
def validate_training_data_quality ( self , dataset : TrainingDataSet ) - > Dict [ str , Any ] :
2025-07-30 21:21:02 +02:00
2025-07-28 19:28:39 +02:00
""" Enhanced validation of training data quality """
validation_results = {
" is_valid " : True ,
" warnings " : [ ] ,
" errors " : [ ] ,
" data_quality_score " : 100.0 ,
" recommendations " : [ ]
}
# Check sales data completeness
sales_count = len ( dataset . sales_data )
if sales_count < 30 :
validation_results [ " warnings " ] . append (
f " Limited sales data: { sales_count } records (recommended: 30+) "
)
validation_results [ " data_quality_score " ] - = 20
validation_results [ " recommendations " ] . append ( " Consider collecting more historical sales data " )
elif sales_count < 90 :
validation_results [ " warnings " ] . append (
f " Moderate sales data: { sales_count } records (optimal: 90+) "
)
validation_results [ " data_quality_score " ] - = 10
# Check date coverage
date_coverage = ( dataset . date_range . end - dataset . date_range . start ) . days
if date_coverage < 90 :
validation_results [ " warnings " ] . append (
f " Limited date coverage: { date_coverage } days (recommended: 90+) "
)
validation_results [ " data_quality_score " ] - = 15
validation_results [ " recommendations " ] . append ( " Extend date range for better seasonality detection " )
# Check external data availability
if not dataset . weather_data :
validation_results [ " warnings " ] . append ( " No weather data available " )
validation_results [ " data_quality_score " ] - = 10
validation_results [ " recommendations " ] . append ( " Weather data improves forecast accuracy " )
elif len ( dataset . weather_data ) < date_coverage * 0.5 :
validation_results [ " warnings " ] . append ( " Sparse weather data coverage " )
validation_results [ " data_quality_score " ] - = 5
if not dataset . traffic_data :
validation_results [ " warnings " ] . append ( " No traffic data available " )
validation_results [ " data_quality_score " ] - = 5
validation_results [ " recommendations " ] . append ( " Traffic data can help with location-based patterns " )
# Check data consistency
unique_products = set ( )
for record in dataset . sales_data :
2025-08-14 16:47:34 +02:00
if ' inventory_product_id ' in record :
unique_products . add ( record [ ' inventory_product_id ' ] )
2025-07-28 19:28:39 +02:00
if len ( unique_products ) == 0 :
validation_results [ " errors " ] . append ( " No product names found in sales data " )
validation_results [ " is_valid " ] = False
elif len ( unique_products ) > 50 :
validation_results [ " warnings " ] . append (
f " Many products detected ( { len ( unique_products ) } ). Consider training models in batches. "
)
validation_results [ " recommendations " ] . append ( " Group similar products for better training efficiency " )
# Check for data source constraints
if dataset . date_range . constraints :
constraint_info = [ ]
for constraint_type , message in dataset . date_range . constraints . items ( ) :
constraint_info . append ( f " { constraint_type } : { message } " )
validation_results [ " warnings " ] . append (
f " Data source constraints applied: { ' ; ' . join ( constraint_info ) } "
)
# Final validation
if validation_results [ " errors " ] :
validation_results [ " is_valid " ] = False
validation_results [ " data_quality_score " ] = 0.0
# Ensure score doesn't go below 0
validation_results [ " data_quality_score " ] = max ( 0.0 , validation_results [ " data_quality_score " ] )
# Add quality assessment
score = validation_results [ " data_quality_score " ]
if score > = 80 :
validation_results [ " quality_assessment " ] = " Excellent "
elif score > = 60 :
validation_results [ " quality_assessment " ] = " Good "
elif score > = 40 :
validation_results [ " quality_assessment " ] = " Fair "
else :
validation_results [ " quality_assessment " ] = " Poor "
return validation_results
def get_data_collection_plan ( self , aligned_range : AlignedDateRange ) - > Dict [ str , Dict ] :
"""
Generate an enhanced data collection plan based on the aligned date range .
"""
plan = {
" collection_summary " : {
" start_date " : aligned_range . start . isoformat ( ) ,
" end_date " : aligned_range . end . isoformat ( ) ,
" duration_days " : ( aligned_range . end - aligned_range . start ) . days ,
" available_sources " : [ source . value for source in aligned_range . available_sources ] ,
" constraints " : aligned_range . constraints
} ,
" data_sources " : { }
}
# Bakery Sales Data
if DataSourceType . BAKERY_SALES in aligned_range . available_sources :
plan [ " data_sources " ] [ " sales_data " ] = {
" start_date " : aligned_range . start . isoformat ( ) ,
" end_date " : aligned_range . end . isoformat ( ) ,
" source " : " user_upload " ,
" required " : True ,
" priority " : " high " ,
" expected_records " : " variable " ,
2025-08-14 16:47:34 +02:00
" data_points " : [ " date " , " inventory_product_id " , " quantity " ] ,
2025-07-28 19:28:39 +02:00
" validation " : " required_fields_check "
}
# Madrid Traffic Data
if DataSourceType . MADRID_TRAFFIC in aligned_range . available_sources :
plan [ " data_sources " ] [ " traffic_data " ] = {
" start_date " : aligned_range . start . isoformat ( ) ,
" end_date " : aligned_range . end . isoformat ( ) ,
" source " : " madrid_opendata " ,
" required " : False ,
" priority " : " medium " ,
" expected_records " : ( aligned_range . end - aligned_range . start ) . days ,
" constraint " : " Cannot request current month data " ,
" data_points " : [ " date " , " traffic_volume " , " congestion_level " ] ,
" validation " : " date_constraint_check "
}
# Weather Data
if DataSourceType . WEATHER_FORECAST in aligned_range . available_sources :
plan [ " data_sources " ] [ " weather_data " ] = {
" start_date " : aligned_range . start . isoformat ( ) ,
" end_date " : aligned_range . end . isoformat ( ) ,
" source " : " aemet_api " ,
" required " : False ,
" priority " : " high " ,
" expected_records " : ( aligned_range . end - aligned_range . start ) . days ,
" constraint " : " Available from yesterday backward " ,
" data_points " : [ " date " , " temperature " , " precipitation " , " humidity " ] ,
" validation " : " temporal_constraint_check " ,
" fallback " : " synthetic_madrid_weather "
}
return plan
def get_orchestration_summary ( self , dataset : TrainingDataSet ) - > Dict [ str , Any ] :
"""
Generate a comprehensive summary of the orchestration process .
"""
return {
" tenant_id " : dataset . metadata . get ( " tenant_id " ) ,
" job_id " : dataset . metadata . get ( " job_id " ) ,
" orchestration_completed_at " : dataset . metadata . get ( " preparation_timestamp " ) ,
" data_alignment " : {
" original_range " : dataset . metadata . get ( " original_sales_range " ) ,
" aligned_range " : {
" start " : dataset . date_range . start . isoformat ( ) ,
" end " : dataset . date_range . end . isoformat ( ) ,
" duration_days " : ( dataset . date_range . end - dataset . date_range . start ) . days
} ,
" constraints_applied " : dataset . date_range . constraints ,
" available_sources " : [ source . value for source in dataset . date_range . available_sources ]
} ,
" data_collection_results " : {
" sales_records " : len ( dataset . sales_data ) ,
" weather_records " : len ( dataset . weather_data ) ,
" traffic_records " : len ( dataset . traffic_data ) ,
" total_records " : len ( dataset . sales_data ) + len ( dataset . weather_data ) + len ( dataset . traffic_data )
} ,
" data_quality " : dataset . metadata . get ( " data_quality " , { } ) ,
" validation_results " : dataset . metadata . get ( " final_validation " , { } ) ,
" processing_metadata " : {
" bakery_location " : dataset . metadata . get ( " bakery_location " ) ,
" data_sources_requested " : len ( dataset . date_range . available_sources ) ,
" data_sources_successful " : sum ( [
1 if len ( dataset . sales_data ) > 0 else 0 ,
1 if len ( dataset . weather_data ) > 0 else 0 ,
1 if len ( dataset . traffic_data ) > 0 else 0
] )
}
2025-11-05 13:34:56 +01:00
}