Fix data fetch 2
This commit is contained in:
@@ -43,7 +43,7 @@ class DataServiceClient:
|
||||
|
||||
# Make request via gateway
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
response = await client.post(
|
||||
f"{self.base_url}/api/v1/tenants/{tenant_id}/sales",
|
||||
headers=headers,
|
||||
params=params
|
||||
@@ -87,58 +87,102 @@ class DataServiceClient:
|
||||
tenant_id=tenant_id)
|
||||
return []
|
||||
|
||||
async def fetch_weather_data(self, tenant_id: str) -> List[Dict[str, Any]]:
|
||||
"""Fetch weather data via API Gateway"""
|
||||
async def fetch_weather_data(
|
||||
self,
|
||||
tenant_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
latitude: Optional[float] = None,
|
||||
longitude: Optional[float] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Fetch historical weather data for training via API Gateway using POST
|
||||
"""
|
||||
try:
|
||||
# Get service token
|
||||
token = await service_auth.get_service_token()
|
||||
|
||||
# Prepare headers
|
||||
headers = service_auth.get_request_headers(tenant_id)
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
params = {strta date, end date, lat, long }
|
||||
# Prepare request payload
|
||||
payload = {
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"latitude": latitude or 40.4168, # Default Madrid coordinates
|
||||
"longitude": longitude or -3.7038
|
||||
}
|
||||
|
||||
# Make POST request via gateway
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
f"{self.base_url}/api/v1/tenants/{tenant_id}/weather/history",
|
||||
response = await client.post(
|
||||
f"{self.base_url}/api/v1/tenants/{tenant_id}/weather/historical",
|
||||
headers=headers,
|
||||
params=params,
|
||||
timeout=30.0
|
||||
json=payload
|
||||
)
|
||||
|
||||
logger.info(f"Weather data request: {response.status_code}",
|
||||
tenant_id=tenant_id,
|
||||
url=response.url)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(f"Fetched {len(data)} weather records", tenant_id=tenant_id)
|
||||
return data
|
||||
return response.json()
|
||||
else:
|
||||
logger.warning(f"Weather data fetch failed: {response.status_code}",
|
||||
tenant_id=tenant_id)
|
||||
logger.error(f"Failed to fetch weather data: {response.status_code} - {response.text}")
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error fetching weather data: {e}", tenant_id=tenant_id)
|
||||
logger.error(f"Error fetching weather data: {str(e)}")
|
||||
return []
|
||||
|
||||
async def fetch_traffic_data(self, tenant_id: str) -> List[Dict[str, Any]]:
|
||||
"""Fetch traffic data via API Gateway"""
|
||||
|
||||
async def fetch_traffic_data(
|
||||
self,
|
||||
tenant_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
latitude: Optional[float] = None,
|
||||
longitude: Optional[float] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Fetch historical traffic data for training via API Gateway using POST
|
||||
"""
|
||||
try:
|
||||
# Get service token
|
||||
token = await service_auth.get_service_token()
|
||||
|
||||
# Prepare headers
|
||||
headers = service_auth.get_request_headers(tenant_id)
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
# Prepare request payload
|
||||
payload = {
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"latitude": latitude or 40.4168, # Default Madrid coordinates
|
||||
"longitude": longitude or -3.7038
|
||||
}
|
||||
|
||||
# Make POST request via gateway
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical",
|
||||
headers=headers
|
||||
headers=headers,
|
||||
json=payload
|
||||
)
|
||||
|
||||
logger.info(f"Traffic data request: {response.status_code}",
|
||||
tenant_id=tenant_id,
|
||||
url=response.url)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(f"Fetched {len(data)} traffic records", tenant_id=tenant_id)
|
||||
return data
|
||||
return response.json()
|
||||
else:
|
||||
logger.warning(f"Traffic data fetch failed: {response.status_code}",
|
||||
tenant_id=tenant_id)
|
||||
logger.error(f"Failed to fetch traffic data: {response.status_code} - {response.text}")
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error fetching traffic data: {e}", tenant_id=tenant_id)
|
||||
logger.error(f"Error fetching traffic data: {str(e)}")
|
||||
return []
|
||||
@@ -140,17 +140,29 @@ class TrainingService:
|
||||
# Fetch sales data from data service
|
||||
sales_data = await self.data_client.fetch_sales_data(tenant_id)
|
||||
|
||||
# Fetch external data if requested
|
||||
if not sales_data:
|
||||
raise ValueError("No sales data found for training")
|
||||
|
||||
# Determine date range from sales data
|
||||
start_date, end_date = await self._determine_sales_date_range(sales_data)
|
||||
|
||||
# Fetch external data if requested using the sales date range
|
||||
weather_data = []
|
||||
traffic_data = []
|
||||
|
||||
if request.include_weather:
|
||||
await self._update_job_status(db, job_id, "running", 15, "Fetching weather data")
|
||||
weather_data = await self.data_client.fetch_weather_data(tenant_id)
|
||||
await self._update_job_status(db, job_id, "running", 15, "Fetching weather data")
|
||||
weather_data = await self.data_client.fetch_weather_data(
|
||||
tenant_id,
|
||||
start_date=start_date.isoformat(),
|
||||
end_date=end_date.isoformat()
|
||||
)
|
||||
|
||||
if request.include_traffic:
|
||||
await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data")
|
||||
traffic_data = await self.data_client.fetch_traffic_data(tenant_id)
|
||||
await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data")
|
||||
traffic_data = await self.data_client.fetch_traffic_data(
|
||||
tenant_id,
|
||||
start_date=start_date.isoformat(),
|
||||
end_date=end_date.isoformat()
|
||||
)
|
||||
|
||||
# Execute ML training
|
||||
await self._update_job_status(db, job_id, "running", 35, "Processing training data")
|
||||
@@ -668,4 +680,26 @@ class TrainingService:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get training logs: {str(e)}")
|
||||
return None
|
||||
return None
|
||||
|
||||
async def _determine_sales_date_range(self, sales_data: List[Dict]) -> tuple[datetime, datetime]:
|
||||
"""Determine start and end dates from sales data"""
|
||||
if not sales_data:
|
||||
raise ValueError("No sales data available to determine date range")
|
||||
|
||||
dates = []
|
||||
for record in sales_data:
|
||||
if 'date' in record:
|
||||
if isinstance(record['date'], str):
|
||||
dates.append(datetime.fromisoformat(record['date'].replace('Z', '+00:00')))
|
||||
elif isinstance(record['date'], datetime):
|
||||
dates.append(record['date'])
|
||||
|
||||
if not dates:
|
||||
raise ValueError("No valid dates found in sales data")
|
||||
|
||||
start_date = min(dates)
|
||||
end_date = max(dates)
|
||||
|
||||
logger.info(f"Determined sales date range: {start_date} to {end_date}")
|
||||
return start_date, end_date
|
||||
Reference in New Issue
Block a user