Files
bakery-ia/shared/clients/external_client.py
2025-11-13 16:01:08 +01:00

418 lines
14 KiB
Python

# shared/clients/external_client.py
"""
External Service Client
Handles all API calls to the external service (weather and traffic data)
"""
import httpx
import structlog
from typing import Dict, Any, Optional, List
from .base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
class ExternalServiceClient(BaseServiceClient):
"""Client for communicating with the external service"""
def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"):
super().__init__(calling_service_name, config)
self.service_url = config.EXTERNAL_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# WEATHER DATA
# ================================================================
async def get_weather_historical(
self,
tenant_id: str,
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get historical weather data using NEW v2.0 optimized city-based endpoint
This uses pre-loaded data from the database with Redis caching for <100ms response times
"""
# Prepare query parameters
params = {
"latitude": latitude or 40.4168, # Default Madrid coordinates
"longitude": longitude or -3.7038,
"start_date": start_date, # ISO format datetime
"end_date": end_date # ISO format datetime
}
logger.info(f"Weather request (v2.0 optimized): {params}", tenant_id=tenant_id)
# Use GET request to new optimized endpoint with short timeout (data is cached)
result = await self._make_request(
"GET",
"external/operations/historical-weather-optimized",
tenant_id=tenant_id,
params=params,
timeout=10.0 # Much shorter - data is pre-loaded and cached
)
if result:
logger.info(f"Successfully fetched {len(result)} weather records from v2.0 endpoint")
return result
else:
logger.warning("No weather data returned from v2.0 endpoint")
return []
async def get_current_weather(
self,
tenant_id: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[Dict[str, Any]]:
"""
Get current weather for a location (real-time data)
Uses new v2.0 endpoint
"""
params = {
"latitude": latitude or 40.4168,
"longitude": longitude or -3.7038
}
logger.info(f"Current weather request (v2.0): {params}", tenant_id=tenant_id)
result = await self._make_request(
"GET",
"external/operations/weather/current",
tenant_id=tenant_id,
params=params,
timeout=10.0
)
if result:
logger.info("Successfully fetched current weather")
return result
else:
logger.warning("No current weather data available")
return None
async def get_weather_forecast(
self,
tenant_id: str,
days: int = 7,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get weather forecast for location (from AEMET)
Uses new v2.0 endpoint
"""
params = {
"latitude": latitude or 40.4168,
"longitude": longitude or -3.7038,
"days": days
}
logger.info(f"Weather forecast request (v2.0): {params}", tenant_id=tenant_id)
result = await self._make_request(
"GET",
"external/operations/weather/forecast",
tenant_id=tenant_id,
params=params,
timeout=10.0
)
if result:
logger.info(f"Successfully fetched weather forecast for {days} days")
return result
else:
logger.warning("No forecast data available")
return []
# ================================================================
# TRAFFIC DATA
# ================================================================
async def get_traffic_data(
self,
tenant_id: str,
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get historical traffic data using NEW v2.0 optimized city-based endpoint
This uses pre-loaded data from the database with Redis caching for <100ms response times
"""
# Prepare query parameters
params = {
"latitude": latitude or 40.4168, # Default Madrid coordinates
"longitude": longitude or -3.7038,
"start_date": start_date, # ISO format datetime
"end_date": end_date # ISO format datetime
}
logger.info(f"Traffic request (v2.0 optimized): {params}", tenant_id=tenant_id)
# Use GET request to new optimized endpoint with short timeout (data is cached)
result = await self._make_request(
"GET",
"external/operations/historical-traffic-optimized",
tenant_id=tenant_id,
params=params,
timeout=10.0 # Much shorter - data is pre-loaded and cached
)
if result:
logger.info(f"Successfully fetched {len(result)} traffic records from v2.0 endpoint")
return result
else:
logger.warning("No traffic data returned from v2.0 endpoint")
return []
async def get_stored_traffic_data_for_training(
self,
tenant_id: str,
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get stored traffic data for model training/re-training
In v2.0, this uses the same optimized endpoint as get_traffic_data
since all data is pre-loaded and cached
"""
logger.info("Training traffic data request - delegating to optimized endpoint", tenant_id=tenant_id)
# Delegate to the same optimized endpoint
return await self.get_traffic_data(
tenant_id=tenant_id,
start_date=start_date,
end_date=end_date,
latitude=latitude,
longitude=longitude
)
async def get_current_traffic(
self,
tenant_id: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None
) -> Optional[Dict[str, Any]]:
"""
Get current traffic conditions for a location (real-time data)
Uses new v2.0 endpoint
"""
params = {
"latitude": latitude or 40.4168,
"longitude": longitude or -3.7038
}
logger.info(f"Current traffic request (v2.0): {params}", tenant_id=tenant_id)
result = await self._make_request(
"GET",
"external/operations/traffic/current",
tenant_id=tenant_id,
params=params,
timeout=10.0
)
if result:
logger.info("Successfully fetched current traffic")
return result
else:
logger.warning("No current traffic data available")
return None
# ================================================================
# CALENDAR DATA (School Calendars and Hyperlocal Information)
# ================================================================
async def get_tenant_location_context(
self,
tenant_id: str
) -> Optional[Dict[str, Any]]:
"""
Get tenant location context including school calendar assignment
"""
logger.info("Fetching tenant location context", tenant_id=tenant_id)
result = await self._make_request(
"GET",
f"external/tenants/{tenant_id}/location-context",
tenant_id=tenant_id,
timeout=5.0
)
if result:
logger.info("Successfully fetched tenant location context", tenant_id=tenant_id)
return result
else:
logger.info("No location context found for tenant", tenant_id=tenant_id)
return None
async def get_school_calendar(
self,
calendar_id: str,
tenant_id: str
) -> Optional[Dict[str, Any]]:
"""
Get school calendar details by ID
"""
logger.info("Fetching school calendar", calendar_id=calendar_id, tenant_id=tenant_id)
result = await self._make_request(
"GET",
f"external/operations/school-calendars/{calendar_id}",
tenant_id=tenant_id,
timeout=5.0
)
if result:
logger.info("Successfully fetched school calendar", calendar_id=calendar_id)
return result
else:
logger.warning("School calendar not found", calendar_id=calendar_id)
return None
async def check_is_school_holiday(
self,
calendar_id: str,
check_date: str,
tenant_id: str
) -> Optional[Dict[str, Any]]:
"""
Check if a specific date is a school holiday
Args:
calendar_id: School calendar UUID
check_date: Date to check in ISO format (YYYY-MM-DD)
tenant_id: Tenant ID for auth
Returns:
Dict with is_holiday, holiday_name, etc.
"""
params = {"check_date": check_date}
logger.debug(
"Checking school holiday status",
calendar_id=calendar_id,
date=check_date,
tenant_id=tenant_id
)
result = await self._make_request(
"GET",
f"external/operations/school-calendars/{calendar_id}/is-holiday",
tenant_id=tenant_id,
params=params,
timeout=5.0
)
return result
async def get_city_school_calendars(
self,
city_id: str,
tenant_id: str,
school_type: Optional[str] = None,
academic_year: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Get all school calendars for a city with optional filters
Args:
city_id: City ID (e.g., "madrid")
tenant_id: Tenant ID for auth
school_type: Optional filter by school type
academic_year: Optional filter by academic year
Returns:
Dict with calendars list and total count
"""
params = {}
if school_type:
params["school_type"] = school_type
if academic_year:
params["academic_year"] = academic_year
logger.info(
"Fetching school calendars for city",
city_id=city_id,
tenant_id=tenant_id,
filters=params
)
result = await self._make_request(
"GET",
f"external/operations/cities/{city_id}/school-calendars",
tenant_id=tenant_id,
params=params if params else None,
timeout=5.0
)
if result:
logger.info(
"Successfully fetched school calendars",
city_id=city_id,
total=result.get("total", 0)
)
return result
else:
logger.warning("No school calendars found for city", city_id=city_id)
return None
# ================================================================
# POI (POINT OF INTEREST) DATA
# ================================================================
async def get_poi_context(
self,
tenant_id: str
) -> Optional[Dict[str, Any]]:
"""
Get POI context for a tenant including ML features for forecasting.
This retrieves stored POI detection results and calculated ML features
that should be included in demand forecasting predictions.
Args:
tenant_id: Tenant ID
Returns:
Dict with POI context including:
- ml_features: Dict of POI features for ML models (e.g., poi_retail_total_count)
- poi_detection_results: Full detection results
- location: Latitude/longitude
- total_pois_detected: Count of POIs
"""
logger.info("Fetching POI context for forecasting", tenant_id=tenant_id)
# Note: POI context endpoint structure is /external/poi-context/{tenant_id}
# We pass tenant_id to _make_request which will build: /api/v1/tenants/{tenant_id}/external/poi-context/{tenant_id}
# But the actual endpoint in external service is just /poi-context/{tenant_id}
# So we need to use the operations prefix correctly
result = await self._make_request(
"GET",
f"external/operations/poi-context/{tenant_id}",
tenant_id=None, # Don't auto-prefix, we're including tenant_id in the path
timeout=5.0
)
if result:
logger.info(
"Successfully fetched POI context",
tenant_id=tenant_id,
total_pois=result.get("total_pois_detected", 0),
ml_features_count=len(result.get("ml_features", {}))
)
return result
else:
logger.info("No POI context found for tenant", tenant_id=tenant_id)
return None