From 938fd24e3a4f8cdf346583f924e37fb12f559c20 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Sun, 27 Jul 2025 21:32:29 +0200 Subject: [PATCH] Fix data fetch 5 --- services/training/app/core/service_auth.py | 1 - services/training/app/services/data_client.py | 60 ++++++++-- .../training/app/services/training_service.py | 113 ++++++++++++++++-- 3 files changed, 152 insertions(+), 22 deletions(-) diff --git a/services/training/app/core/service_auth.py b/services/training/app/core/service_auth.py index 127e958a..e07ad6b4 100644 --- a/services/training/app/core/service_auth.py +++ b/services/training/app/core/service_auth.py @@ -68,7 +68,6 @@ class ServiceAuthenticator: def get_request_headers(self, tenant_id: str = None) -> Dict[str, str]: """Get standard headers for service requests""" headers = { - "Content-Type": "application/json", "X-Service": "training-service", "User-Agent": "training-service/1.0.0" } diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py index 2777dc12..dc053308 100644 --- a/services/training/app/services/data_client.py +++ b/services/training/app/services/data_client.py @@ -43,7 +43,7 @@ class DataServiceClient: # Make request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.post( + response = await client.get( f"{self.base_url}/api/v1/tenants/{tenant_id}/sales", headers=headers, params=params @@ -87,6 +87,7 @@ class DataServiceClient: tenant_id=tenant_id) return [] + async def fetch_weather_data( self, tenant_id: str, @@ -107,14 +108,16 @@ class DataServiceClient: headers["Authorization"] = f"Bearer {token}" headers["Content-Type"] = "application/json" - # Prepare request payload + # Prepare request payload with proper date handling payload = { - "start_date": start_date, - "end_date": end_date, + "start_date": start_date, # Already in ISO format from calling code + "end_date": end_date, # Already in ISO format from calling code "latitude": latitude or 40.4168, # Default Madrid coordinates "longitude": longitude or -3.7038 } + logger.info(f"Weather request payload: {payload}", tenant_id=tenant_id) + # Make POST request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( @@ -128,11 +131,36 @@ class DataServiceClient: url=response.url) if response.status_code == 200: - return response.json() + data = response.json() + logger.info(f"Successfully fetched {len(data)} weather records") + return data + elif response.status_code == 400: + error_details = response.text + logger.error(f"Weather API validation error (400): {error_details}") + + # Try to parse the error and provide helpful info + try: + error_json = response.json() + if 'detail' in error_json: + detail = error_json['detail'] + if 'End date must be after start date' in str(detail): + logger.error(f"Date range issue: start={start_date}, end={end_date}") + elif 'Date range cannot exceed 90 days' in str(detail): + logger.error(f"Date range too large: {start_date} to {end_date}") + except: + pass + + return [] + elif response.status_code == 401: + logger.error("Authentication failed for weather API") + return [] else: logger.error(f"Failed to fetch weather data: {response.status_code} - {response.text}") return [] + except httpx.TimeoutException: + logger.error("Timeout when fetching weather data") + return [] except Exception as e: logger.error(f"Error fetching weather data: {str(e)}") return [] @@ -159,15 +187,17 @@ class DataServiceClient: # Prepare request payload payload = { - "start_date": start_date, - "end_date": end_date, + "start_date": start_date, # Already in ISO format from calling code + "end_date": end_date, # Already in ISO format from calling code "latitude": latitude or 40.4168, # Default Madrid coordinates "longitude": longitude or -3.7038 } + logger.info(f"Traffic request payload: {payload}", tenant_id=tenant_id) + # Make POST request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.get( + response = await client.post( f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical", headers=headers, json=payload @@ -178,11 +208,23 @@ class DataServiceClient: url=response.url) if response.status_code == 200: - return response.json() + data = response.json() + logger.info(f"Successfully fetched {len(data)} traffic records") + return data + elif response.status_code == 400: + error_details = response.text + logger.error(f"Traffic API validation error (400): {error_details}") + return [] + elif response.status_code == 401: + logger.error("Authentication failed for traffic API") + return [] else: logger.error(f"Failed to fetch traffic data: {response.status_code} - {response.text}") return [] + except httpx.TimeoutException: + logger.error("Timeout when fetching traffic data") + return [] except Exception as e: logger.error(f"Error fetching traffic data: {str(e)}") return [] \ No newline at end of file diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index 52e29653..ff4bfd5d 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -34,6 +34,75 @@ class TrainingService: self.ml_trainer = BakeryMLTrainer() self.data_client = DataServiceClient() + async def _determine_sales_date_range(self, sales_data: List[Dict]) -> tuple[datetime, datetime]: + """Determine start and end dates from sales data with validation""" + if not sales_data: + raise ValueError("No sales data available to determine date range") + + dates = [] + for record in sales_data: + if 'date' in record: + try: + if isinstance(record['date'], str): + # Handle various date string formats + date_str = record['date'].replace('Z', '+00:00') + if 'T' in date_str: + parsed_date = datetime.fromisoformat(date_str) + else: + # Handle date-only strings + parsed_date = datetime.strptime(date_str, '%Y-%m-%d') + dates.append(parsed_date) + elif isinstance(record['date'], datetime): + dates.append(record['date']) + except (ValueError, AttributeError) as e: + logger.warning(f"Invalid date format in record: {record['date']} - {e}") + continue + + if not dates: + raise ValueError("No valid dates found in sales data") + + start_date = min(dates) + end_date = max(dates) + + # Validate and adjust date range for external APIs + start_date, end_date = self._adjust_date_range_for_apis(start_date, end_date) + + logger.info(f"Determined and adjusted sales date range: {start_date} to {end_date}") + return start_date, end_date + + def _adjust_date_range_for_apis(self, start_date: datetime, end_date: datetime) -> tuple[datetime, datetime]: + """Adjust date range to comply with external API limits""" + + # Weather and traffic APIs have a 90-day limit + MAX_DAYS = 90 + + # Calculate current range + current_range = (end_date - start_date).days + + if current_range > MAX_DAYS: + logger.warning(f"Date range ({current_range} days) exceeds API limit ({MAX_DAYS} days). Adjusting...") + + # Keep the most recent data + start_date = end_date - timedelta(days=MAX_DAYS) + logger.info(f"Adjusted start_date to {start_date} to fit within {MAX_DAYS} day limit") + + # Ensure dates are not in the future + now = datetime.now() + if end_date > now: + end_date = now.replace(hour=0, minute=0, second=0, microsecond=0) + logger.info(f"Adjusted end_date to {end_date} (cannot be in future)") + + if start_date > now: + start_date = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30) + logger.info(f"Adjusted start_date to {start_date} (was in future)") + + # Ensure start_date is before end_date + if start_date >= end_date: + start_date = end_date - timedelta(days=30) # Default to 30 days of data + logger.warning(f"start_date was not before end_date. Adjusted start_date to {start_date}") + + return start_date, end_date + async def execute_training_job_simple(self, job_id: str, tenant_id_str: str, request: TrainingJobRequest): """Simple wrapper that creates its own database session""" try: @@ -146,24 +215,44 @@ class TrainingService: # Determine date range from sales data start_date, end_date = await self._determine_sales_date_range(sales_data) + # Convert dates to ISO format strings for API calls + start_date_str = start_date.isoformat() + end_date_str = end_date.isoformat() + + logger.info(f"Using date range for external APIs: {start_date_str} to {end_date_str}") + # Fetch external data if requested using the sales date range weather_data = [] traffic_data = [] await self._update_job_status(db, job_id, "running", 15, "Fetching weather data") - weather_data = await self.data_client.fetch_weather_data( - tenant_id, - start_date=start_date.isoformat(), - end_date=end_date.isoformat() - ) - + try: + weather_data = await self.data_client.fetch_weather_data( + tenant_id=tenant_id, + start_date=start_date_str, + end_date=end_date_str, + latitude=40.4168, # Madrid coordinates + longitude=-3.7038 + ) + logger.info(f"Fetched {len(weather_data)} weather records") + except Exception as e: + logger.warning(f"Failed to fetch weather data: {e}. Continuing without weather data.") + weather_data = [] + await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data") - traffic_data = await self.data_client.fetch_traffic_data( - tenant_id, - start_date=start_date.isoformat(), - end_date=end_date.isoformat() - ) - + try: + traffic_data = await self.data_client.fetch_traffic_data( + tenant_id=tenant_id, + start_date=start_date_str, + end_date=end_date_str, + latitude=40.4168, + longitude=-3.7038 + ) + logger.info(f"Fetched {len(traffic_data)} traffic records") + except Exception as e: + logger.warning(f"Failed to fetch traffic data: {e}. Continuing without traffic data.") + traffic_data = [] + # Execute ML training await self._update_job_status(db, job_id, "running", 35, "Processing training data")