Fix data fetch 5

This commit is contained in:
Urtzi Alfaro
2025-07-27 21:32:29 +02:00
parent a627b566d2
commit 938fd24e3a
3 changed files with 152 additions and 22 deletions

View File

@@ -68,7 +68,6 @@ class ServiceAuthenticator:
def get_request_headers(self, tenant_id: str = None) -> Dict[str, str]:
"""Get standard headers for service requests"""
headers = {
"Content-Type": "application/json",
"X-Service": "training-service",
"User-Agent": "training-service/1.0.0"
}

View File

@@ -43,7 +43,7 @@ class DataServiceClient:
# Make request via gateway
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
response = await client.get(
f"{self.base_url}/api/v1/tenants/{tenant_id}/sales",
headers=headers,
params=params
@@ -87,6 +87,7 @@ class DataServiceClient:
tenant_id=tenant_id)
return []
async def fetch_weather_data(
self,
tenant_id: str,
@@ -107,14 +108,16 @@ class DataServiceClient:
headers["Authorization"] = f"Bearer {token}"
headers["Content-Type"] = "application/json"
# Prepare request payload
# Prepare request payload with proper date handling
payload = {
"start_date": start_date,
"end_date": end_date,
"start_date": start_date, # Already in ISO format from calling code
"end_date": end_date, # Already in ISO format from calling code
"latitude": latitude or 40.4168, # Default Madrid coordinates
"longitude": longitude or -3.7038
}
logger.info(f"Weather request payload: {payload}", tenant_id=tenant_id)
# Make POST request via gateway
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
@@ -128,11 +131,36 @@ class DataServiceClient:
url=response.url)
if response.status_code == 200:
return response.json()
data = response.json()
logger.info(f"Successfully fetched {len(data)} weather records")
return data
elif response.status_code == 400:
error_details = response.text
logger.error(f"Weather API validation error (400): {error_details}")
# Try to parse the error and provide helpful info
try:
error_json = response.json()
if 'detail' in error_json:
detail = error_json['detail']
if 'End date must be after start date' in str(detail):
logger.error(f"Date range issue: start={start_date}, end={end_date}")
elif 'Date range cannot exceed 90 days' in str(detail):
logger.error(f"Date range too large: {start_date} to {end_date}")
except:
pass
return []
elif response.status_code == 401:
logger.error("Authentication failed for weather API")
return []
else:
logger.error(f"Failed to fetch weather data: {response.status_code} - {response.text}")
return []
except httpx.TimeoutException:
logger.error("Timeout when fetching weather data")
return []
except Exception as e:
logger.error(f"Error fetching weather data: {str(e)}")
return []
@@ -159,15 +187,17 @@ class DataServiceClient:
# Prepare request payload
payload = {
"start_date": start_date,
"end_date": end_date,
"start_date": start_date, # Already in ISO format from calling code
"end_date": end_date, # Already in ISO format from calling code
"latitude": latitude or 40.4168, # Default Madrid coordinates
"longitude": longitude or -3.7038
}
logger.info(f"Traffic request payload: {payload}", tenant_id=tenant_id)
# Make POST request via gateway
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
response = await client.post(
f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical",
headers=headers,
json=payload
@@ -178,11 +208,23 @@ class DataServiceClient:
url=response.url)
if response.status_code == 200:
return response.json()
data = response.json()
logger.info(f"Successfully fetched {len(data)} traffic records")
return data
elif response.status_code == 400:
error_details = response.text
logger.error(f"Traffic API validation error (400): {error_details}")
return []
elif response.status_code == 401:
logger.error("Authentication failed for traffic API")
return []
else:
logger.error(f"Failed to fetch traffic data: {response.status_code} - {response.text}")
return []
except httpx.TimeoutException:
logger.error("Timeout when fetching traffic data")
return []
except Exception as e:
logger.error(f"Error fetching traffic data: {str(e)}")
return []

View File

@@ -34,6 +34,75 @@ class TrainingService:
self.ml_trainer = BakeryMLTrainer()
self.data_client = DataServiceClient()
async def _determine_sales_date_range(self, sales_data: List[Dict]) -> tuple[datetime, datetime]:
"""Determine start and end dates from sales data with validation"""
if not sales_data:
raise ValueError("No sales data available to determine date range")
dates = []
for record in sales_data:
if 'date' in record:
try:
if isinstance(record['date'], str):
# Handle various date string formats
date_str = record['date'].replace('Z', '+00:00')
if 'T' in date_str:
parsed_date = datetime.fromisoformat(date_str)
else:
# Handle date-only strings
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
dates.append(parsed_date)
elif isinstance(record['date'], datetime):
dates.append(record['date'])
except (ValueError, AttributeError) as e:
logger.warning(f"Invalid date format in record: {record['date']} - {e}")
continue
if not dates:
raise ValueError("No valid dates found in sales data")
start_date = min(dates)
end_date = max(dates)
# Validate and adjust date range for external APIs
start_date, end_date = self._adjust_date_range_for_apis(start_date, end_date)
logger.info(f"Determined and adjusted sales date range: {start_date} to {end_date}")
return start_date, end_date
def _adjust_date_range_for_apis(self, start_date: datetime, end_date: datetime) -> tuple[datetime, datetime]:
"""Adjust date range to comply with external API limits"""
# Weather and traffic APIs have a 90-day limit
MAX_DAYS = 90
# Calculate current range
current_range = (end_date - start_date).days
if current_range > MAX_DAYS:
logger.warning(f"Date range ({current_range} days) exceeds API limit ({MAX_DAYS} days). Adjusting...")
# Keep the most recent data
start_date = end_date - timedelta(days=MAX_DAYS)
logger.info(f"Adjusted start_date to {start_date} to fit within {MAX_DAYS} day limit")
# Ensure dates are not in the future
now = datetime.now()
if end_date > now:
end_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
logger.info(f"Adjusted end_date to {end_date} (cannot be in future)")
if start_date > now:
start_date = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30)
logger.info(f"Adjusted start_date to {start_date} (was in future)")
# Ensure start_date is before end_date
if start_date >= end_date:
start_date = end_date - timedelta(days=30) # Default to 30 days of data
logger.warning(f"start_date was not before end_date. Adjusted start_date to {start_date}")
return start_date, end_date
async def execute_training_job_simple(self, job_id: str, tenant_id_str: str, request: TrainingJobRequest):
"""Simple wrapper that creates its own database session"""
try:
@@ -146,24 +215,44 @@ class TrainingService:
# Determine date range from sales data
start_date, end_date = await self._determine_sales_date_range(sales_data)
# Convert dates to ISO format strings for API calls
start_date_str = start_date.isoformat()
end_date_str = end_date.isoformat()
logger.info(f"Using date range for external APIs: {start_date_str} to {end_date_str}")
# Fetch external data if requested using the sales date range
weather_data = []
traffic_data = []
await self._update_job_status(db, job_id, "running", 15, "Fetching weather data")
weather_data = await self.data_client.fetch_weather_data(
tenant_id,
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
try:
weather_data = await self.data_client.fetch_weather_data(
tenant_id=tenant_id,
start_date=start_date_str,
end_date=end_date_str,
latitude=40.4168, # Madrid coordinates
longitude=-3.7038
)
logger.info(f"Fetched {len(weather_data)} weather records")
except Exception as e:
logger.warning(f"Failed to fetch weather data: {e}. Continuing without weather data.")
weather_data = []
await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data")
traffic_data = await self.data_client.fetch_traffic_data(
tenant_id,
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
try:
traffic_data = await self.data_client.fetch_traffic_data(
tenant_id=tenant_id,
start_date=start_date_str,
end_date=end_date_str,
latitude=40.4168,
longitude=-3.7038
)
logger.info(f"Fetched {len(traffic_data)} traffic records")
except Exception as e:
logger.warning(f"Failed to fetch traffic data: {e}. Continuing without traffic data.")
traffic_data = []
# Execute ML training
await self._update_job_status(db, job_id, "running", 35, "Processing training data")