Improve the dahboard with the weather info 2
This commit is contained in:
4
services/external/app/api/traffic.py
vendored
4
services/external/app/api/traffic.py
vendored
@@ -55,8 +55,8 @@ async def get_current_traffic(
|
|||||||
# Continue processing - event publishing failure shouldn't break the API
|
# Continue processing - event publishing failure shouldn't break the API
|
||||||
|
|
||||||
logger.debug("Successfully returning traffic data",
|
logger.debug("Successfully returning traffic data",
|
||||||
volume=traffic.traffic_volume,
|
volume=traffic.get('traffic_volume') if isinstance(traffic, dict) else getattr(traffic, 'traffic_volume', None),
|
||||||
congestion=traffic.congestion_level)
|
congestion=traffic.get('congestion_level') if isinstance(traffic, dict) else getattr(traffic, 'congestion_level', None))
|
||||||
return traffic
|
return traffic
|
||||||
|
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
|
|||||||
45
services/external/app/api/weather.py
vendored
45
services/external/app/api/weather.py
vendored
@@ -155,3 +155,48 @@ async def get_weather_forecast(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to get weather forecast", error=str(e))
|
logger.error("Failed to get weather forecast", error=str(e))
|
||||||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||||||
|
|
||||||
|
@router.get("/weather/status")
|
||||||
|
async def get_weather_status():
|
||||||
|
"""Get AEMET API status and diagnostics"""
|
||||||
|
try:
|
||||||
|
from app.core.config import settings
|
||||||
|
|
||||||
|
# Test AEMET API connectivity
|
||||||
|
aemet_status = "unknown"
|
||||||
|
aemet_message = "Not tested"
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Quick test of AEMET API
|
||||||
|
test_weather = await weather_service.get_current_weather(40.4168, -3.7038)
|
||||||
|
if test_weather and hasattr(test_weather, 'source') and test_weather.source == "aemet":
|
||||||
|
aemet_status = "healthy"
|
||||||
|
aemet_message = "AEMET API responding correctly"
|
||||||
|
elif test_weather and hasattr(test_weather, 'source') and test_weather.source == "synthetic":
|
||||||
|
aemet_status = "degraded"
|
||||||
|
aemet_message = "AEMET API unavailable - using synthetic data"
|
||||||
|
else:
|
||||||
|
aemet_status = "degraded"
|
||||||
|
aemet_message = "Weather service returned unexpected data format"
|
||||||
|
except Exception as e:
|
||||||
|
aemet_status = "unhealthy"
|
||||||
|
aemet_message = f"AEMET API error: {str(e)}"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"aemet": {
|
||||||
|
"status": aemet_status,
|
||||||
|
"message": aemet_message,
|
||||||
|
"api_key_configured": bool(settings.AEMET_API_KEY),
|
||||||
|
"enabled": getattr(settings, 'AEMET_ENABLED', True),
|
||||||
|
"base_url": settings.AEMET_BASE_URL,
|
||||||
|
"timeout": settings.AEMET_TIMEOUT, # Now correctly shows 60 from config
|
||||||
|
"retry_attempts": settings.AEMET_RETRY_ATTEMPTS
|
||||||
|
},
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"service": "external-weather-service"
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to get weather status", error=str(e))
|
||||||
|
raise HTTPException(status_code=500, detail=f"Status check failed: {str(e)}")
|
||||||
|
|||||||
3
services/external/app/core/config.py
vendored
3
services/external/app/core/config.py
vendored
@@ -25,8 +25,9 @@ class DataSettings(BaseServiceSettings):
|
|||||||
# External API Configuration
|
# External API Configuration
|
||||||
AEMET_API_KEY: str = os.getenv("AEMET_API_KEY", "")
|
AEMET_API_KEY: str = os.getenv("AEMET_API_KEY", "")
|
||||||
AEMET_BASE_URL: str = "https://opendata.aemet.es/opendata"
|
AEMET_BASE_URL: str = "https://opendata.aemet.es/opendata"
|
||||||
AEMET_TIMEOUT: int = int(os.getenv("AEMET_TIMEOUT", "30"))
|
AEMET_TIMEOUT: int = int(os.getenv("AEMET_TIMEOUT", "60")) # Increased default
|
||||||
AEMET_RETRY_ATTEMPTS: int = int(os.getenv("AEMET_RETRY_ATTEMPTS", "3"))
|
AEMET_RETRY_ATTEMPTS: int = int(os.getenv("AEMET_RETRY_ATTEMPTS", "3"))
|
||||||
|
AEMET_ENABLED: bool = os.getenv("AEMET_ENABLED", "true").lower() == "true" # Allow disabling AEMET
|
||||||
|
|
||||||
MADRID_OPENDATA_API_KEY: str = os.getenv("MADRID_OPENDATA_API_KEY", "")
|
MADRID_OPENDATA_API_KEY: str = os.getenv("MADRID_OPENDATA_API_KEY", "")
|
||||||
MADRID_OPENDATA_BASE_URL: str = "https://datos.madrid.es"
|
MADRID_OPENDATA_BASE_URL: str = "https://datos.madrid.es"
|
||||||
|
|||||||
13
services/external/app/external/aemet.py
vendored
13
services/external/app/external/aemet.py
vendored
@@ -515,6 +515,10 @@ class AEMETClient(BaseAPIClient):
|
|||||||
base_url="https://opendata.aemet.es/opendata/api",
|
base_url="https://opendata.aemet.es/opendata/api",
|
||||||
api_key=settings.AEMET_API_KEY
|
api_key=settings.AEMET_API_KEY
|
||||||
)
|
)
|
||||||
|
# Override timeout with settings value
|
||||||
|
import httpx
|
||||||
|
self.timeout = httpx.Timeout(float(settings.AEMET_TIMEOUT))
|
||||||
|
self.retries = settings.AEMET_RETRY_ATTEMPTS
|
||||||
self.parser = WeatherDataParser()
|
self.parser = WeatherDataParser()
|
||||||
self.synthetic_generator = SyntheticWeatherGenerator()
|
self.synthetic_generator = SyntheticWeatherGenerator()
|
||||||
self.location_service = LocationService()
|
self.location_service = LocationService()
|
||||||
@@ -541,15 +545,18 @@ class AEMETClient(BaseAPIClient):
|
|||||||
|
|
||||||
weather_data = await self._fetch_current_weather_data(station_id)
|
weather_data = await self._fetch_current_weather_data(station_id)
|
||||||
if weather_data:
|
if weather_data:
|
||||||
logger.info("✅ Successfully fetched AEMET weather data", station_id=station_id)
|
logger.info("🎉 SUCCESS: Real AEMET weather data retrieved!", station_id=station_id)
|
||||||
parsed_data = self.parser.parse_current_weather(weather_data)
|
parsed_data = self.parser.parse_current_weather(weather_data)
|
||||||
# Ensure the source is set to AEMET for successful API calls
|
# Ensure the source is set to AEMET for successful API calls
|
||||||
if parsed_data and isinstance(parsed_data, dict):
|
if parsed_data and isinstance(parsed_data, dict):
|
||||||
parsed_data["source"] = WeatherSource.AEMET.value
|
parsed_data["source"] = WeatherSource.AEMET.value
|
||||||
|
logger.info("📡 AEMET data confirmed - source set to 'aemet'",
|
||||||
|
temperature=parsed_data.get("temperature"),
|
||||||
|
description=parsed_data.get("description"))
|
||||||
return parsed_data
|
return parsed_data
|
||||||
|
|
||||||
logger.warning("❌ AEMET API returned no data - falling back to synthetic",
|
logger.warning("⚠️ AEMET API connectivity issues - using synthetic data",
|
||||||
station_id=station_id, reason="invalid_weather_data")
|
station_id=station_id, reason="aemet_api_unreachable")
|
||||||
return await self._get_synthetic_current_weather()
|
return await self._get_synthetic_current_weather()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -205,18 +205,30 @@ class MadridTrafficClient(BaseTrafficClient, BaseAPIClient):
|
|||||||
traffic_record, location_context
|
traffic_record, location_context
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build enhanced response
|
# Calculate average speed based on congestion level
|
||||||
|
if congestion_level == 'high':
|
||||||
|
average_speed = 15.0
|
||||||
|
elif congestion_level == 'medium':
|
||||||
|
average_speed = 35.0
|
||||||
|
elif congestion_level == 'low':
|
||||||
|
average_speed = 50.0
|
||||||
|
else:
|
||||||
|
average_speed = 30.0 # default
|
||||||
|
|
||||||
|
# Build enhanced response with required API fields
|
||||||
enhanced_data = {
|
enhanced_data = {
|
||||||
|
'date': datetime.now(timezone.utc), # Required API field
|
||||||
'timestamp': datetime.now(timezone.utc),
|
'timestamp': datetime.now(timezone.utc),
|
||||||
'latitude': traffic_point.get('latitude'),
|
'latitude': traffic_point.get('latitude'),
|
||||||
'longitude': traffic_point.get('longitude'),
|
'longitude': traffic_point.get('longitude'),
|
||||||
'measurement_point_id': traffic_point.get('measurement_point_id'),
|
'measurement_point_id': traffic_point.get('measurement_point_id'),
|
||||||
'measurement_point_name': traffic_point.get('measurement_point_name'),
|
'measurement_point_name': traffic_point.get('measurement_point_name'),
|
||||||
'traffic_volume': traffic_point.get('intensidad', 0),
|
'traffic_volume': traffic_point.get('intensidad', 0),
|
||||||
|
'pedestrian_count': pedestrian_count,
|
||||||
|
'congestion_level': congestion_level,
|
||||||
|
'average_speed': average_speed, # Required API field
|
||||||
'occupation_percentage': int(traffic_point.get('ocupacion', 0)),
|
'occupation_percentage': int(traffic_point.get('ocupacion', 0)),
|
||||||
'load_percentage': traffic_point.get('carga', 0),
|
'load_percentage': traffic_point.get('carga', 0),
|
||||||
'congestion_level': congestion_level,
|
|
||||||
'pedestrian_count': pedestrian_count,
|
|
||||||
'road_type': road_type,
|
'road_type': road_type,
|
||||||
'distance_km': distance_km,
|
'distance_km': distance_km,
|
||||||
'source': 'madrid_current_xml',
|
'source': 'madrid_current_xml',
|
||||||
|
|||||||
89
services/external/app/external/base_client.py
vendored
89
services/external/app/external/base_client.py
vendored
@@ -15,44 +15,61 @@ class BaseAPIClient:
|
|||||||
def __init__(self, base_url: str, api_key: Optional[str] = None):
|
def __init__(self, base_url: str, api_key: Optional[str] = None):
|
||||||
self.base_url = base_url
|
self.base_url = base_url
|
||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
self.timeout = httpx.Timeout(30.0)
|
# Increase timeout and add connection retries for unstable AEMET API
|
||||||
|
self.timeout = httpx.Timeout(60.0) # Increased from 30s
|
||||||
|
self.retries = 3
|
||||||
|
|
||||||
async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
||||||
"""Make GET request"""
|
"""Make GET request with retry logic for unstable APIs"""
|
||||||
try:
|
url = f"{self.base_url}{endpoint}"
|
||||||
url = f"{self.base_url}{endpoint}"
|
|
||||||
|
# Add API key to params for AEMET (not headers)
|
||||||
# Add API key to params for AEMET (not headers)
|
request_params = params or {}
|
||||||
request_params = params or {}
|
if self.api_key:
|
||||||
if self.api_key:
|
request_params["api_key"] = self.api_key
|
||||||
request_params["api_key"] = self.api_key
|
|
||||||
|
# Add headers if provided
|
||||||
# Add headers if provided
|
request_headers = headers or {}
|
||||||
request_headers = headers or {}
|
|
||||||
|
logger.debug("Making API request", url=url, params=request_params)
|
||||||
logger.debug("Making API request", url=url, params=request_params)
|
|
||||||
|
# Retry logic for unstable AEMET API
|
||||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
for attempt in range(self.retries):
|
||||||
response = await client.get(url, params=request_params, headers=request_headers)
|
try:
|
||||||
response.raise_for_status()
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||||
|
response = await client.get(url, params=request_params, headers=request_headers)
|
||||||
# Log response for debugging
|
response.raise_for_status()
|
||||||
response_data = response.json()
|
|
||||||
logger.debug("API response received",
|
# Log response for debugging
|
||||||
status_code=response.status_code,
|
response_data = response.json()
|
||||||
response_keys=list(response_data.keys()) if isinstance(response_data, dict) else "non-dict")
|
logger.debug("API response received",
|
||||||
|
status_code=response.status_code,
|
||||||
return response_data
|
response_keys=list(response_data.keys()) if isinstance(response_data, dict) else "non-dict",
|
||||||
|
attempt=attempt + 1)
|
||||||
except httpx.HTTPStatusError as e:
|
|
||||||
logger.error("HTTP error", status_code=e.response.status_code, url=url, response_text=e.response.text[:200])
|
return response_data
|
||||||
return None
|
|
||||||
except httpx.RequestError as e:
|
except httpx.HTTPStatusError as e:
|
||||||
logger.error("Request error", error=str(e), url=url)
|
logger.error("HTTP error", status_code=e.response.status_code, url=url,
|
||||||
return None
|
response_text=e.response.text[:200], attempt=attempt + 1)
|
||||||
except Exception as e:
|
if attempt == self.retries - 1: # Last attempt
|
||||||
logger.error("Unexpected error", error=str(e), url=url)
|
return None
|
||||||
return None
|
except httpx.RequestError as e:
|
||||||
|
logger.error("Request error", error=str(e), url=url, attempt=attempt + 1)
|
||||||
|
if attempt == self.retries - 1: # Last attempt
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Wait before retry (exponential backoff)
|
||||||
|
import asyncio
|
||||||
|
wait_time = 2 ** attempt # 1s, 2s, 4s
|
||||||
|
logger.info(f"Retrying AEMET API in {wait_time}s", attempt=attempt + 1, max_attempts=self.retries)
|
||||||
|
await asyncio.sleep(wait_time)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Unexpected error", error=str(e), url=url, attempt=attempt + 1)
|
||||||
|
if attempt == self.retries - 1: # Last attempt
|
||||||
|
return None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
async def _fetch_url_directly(self, url: str, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
async def _fetch_url_directly(self, url: str, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
||||||
"""Fetch data directly from a full URL (for AEMET datos URLs)"""
|
"""Fetch data directly from a full URL (for AEMET datos URLs)"""
|
||||||
|
|||||||
@@ -25,9 +25,8 @@ class MadridTrafficDataProcessor:
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.logger = structlog.get_logger()
|
self.logger = structlog.get_logger()
|
||||||
# UTM Zone 30N (Madrid's coordinate system)
|
# UTM Zone 30N (Madrid's coordinate system) - using modern pyproj API
|
||||||
self.utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', datum='WGS84')
|
self.transformer = pyproj.Transformer.from_crs("EPSG:25830", "EPSG:4326", always_xy=True)
|
||||||
self.wgs84_proj = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84')
|
|
||||||
|
|
||||||
def safe_int(self, value: str) -> int:
|
def safe_int(self, value: str) -> int:
|
||||||
"""Safely convert string to int"""
|
"""Safely convert string to int"""
|
||||||
@@ -68,8 +67,8 @@ class MadridTrafficDataProcessor:
|
|||||||
utm_x_float = float(utm_x.replace(',', '.'))
|
utm_x_float = float(utm_x.replace(',', '.'))
|
||||||
utm_y_float = float(utm_y.replace(',', '.'))
|
utm_y_float = float(utm_y.replace(',', '.'))
|
||||||
|
|
||||||
# Convert from UTM Zone 30N to WGS84
|
# Convert from UTM Zone 30N to WGS84 using modern pyproj API
|
||||||
longitude, latitude = pyproj.transform(self.utm_proj, self.wgs84_proj, utm_x_float, utm_y_float)
|
longitude, latitude = self.transformer.transform(utm_x_float, utm_y_float)
|
||||||
|
|
||||||
# Validate coordinates are in Madrid area
|
# Validate coordinates are in Madrid area
|
||||||
if 40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5:
|
if 40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5:
|
||||||
@@ -455,9 +454,25 @@ class MadridTrafficDataProcessor:
|
|||||||
carga = self.safe_int(row.get('carga', '0'))
|
carga = self.safe_int(row.get('carga', '0'))
|
||||||
vmed = self.safe_int(row.get('vmed', '0'))
|
vmed = self.safe_int(row.get('vmed', '0'))
|
||||||
|
|
||||||
# Build basic result (business logic will be applied elsewhere)
|
# Calculate average speed (vmed is in km/h, use it if available)
|
||||||
|
average_speed = float(vmed) if vmed > 0 else 30.0 # Default speed
|
||||||
|
|
||||||
|
# Determine congestion level based on occupation percentage
|
||||||
|
if ocupacion > 75:
|
||||||
|
congestion_level = 'high'
|
||||||
|
elif ocupacion > 40:
|
||||||
|
congestion_level = 'medium'
|
||||||
|
else:
|
||||||
|
congestion_level = 'low'
|
||||||
|
|
||||||
|
# Build result with API-compatible fields
|
||||||
result = {
|
result = {
|
||||||
'date': date_obj,
|
'date': date_obj, # Required API field
|
||||||
|
'traffic_volume': intensidad, # Required API field
|
||||||
|
'pedestrian_count': max(1, int(intensidad * 0.1)), # Estimated pedestrian count
|
||||||
|
'congestion_level': congestion_level, # Required API field
|
||||||
|
'average_speed': average_speed, # Required API field
|
||||||
|
'source': 'madrid_historical_csv', # Required API field
|
||||||
'measurement_point_id': measurement_point_id,
|
'measurement_point_id': measurement_point_id,
|
||||||
'point_data': point_data,
|
'point_data': point_data,
|
||||||
'distance_km': distance_km,
|
'distance_km': distance_km,
|
||||||
|
|||||||
24
services/external/app/schemas/traffic.py
vendored
24
services/external/app/schemas/traffic.py
vendored
@@ -31,8 +31,8 @@ class TrafficDataUpdate(BaseModel):
|
|||||||
average_speed: Optional[float] = Field(None, ge=0, le=200)
|
average_speed: Optional[float] = Field(None, ge=0, le=200)
|
||||||
raw_data: Optional[str] = None
|
raw_data: Optional[str] = None
|
||||||
|
|
||||||
class TrafficDataResponse(TrafficDataBase):
|
class TrafficDataResponseDB(TrafficDataBase):
|
||||||
"""Schema for traffic data responses"""
|
"""Schema for traffic data responses from database"""
|
||||||
id: str = Field(..., description="Unique identifier")
|
id: str = Field(..., description="Unique identifier")
|
||||||
created_at: datetime = Field(..., description="Creation timestamp")
|
created_at: datetime = Field(..., description="Creation timestamp")
|
||||||
updated_at: datetime = Field(..., description="Last update timestamp")
|
updated_at: datetime = Field(..., description="Last update timestamp")
|
||||||
@@ -52,7 +52,7 @@ class TrafficDataResponse(TrafficDataBase):
|
|||||||
|
|
||||||
class TrafficDataList(BaseModel):
|
class TrafficDataList(BaseModel):
|
||||||
"""Schema for paginated traffic data responses"""
|
"""Schema for paginated traffic data responses"""
|
||||||
data: List[TrafficDataResponse]
|
data: List[TrafficDataResponseDB]
|
||||||
total: int = Field(..., description="Total number of records")
|
total: int = Field(..., description="Total number of records")
|
||||||
page: int = Field(..., description="Current page number")
|
page: int = Field(..., description="Current page number")
|
||||||
per_page: int = Field(..., description="Records per page")
|
per_page: int = Field(..., description="Records per page")
|
||||||
@@ -72,12 +72,18 @@ class TrafficAnalytics(BaseModel):
|
|||||||
avg_speed: Optional[float] = None
|
avg_speed: Optional[float] = None
|
||||||
|
|
||||||
class TrafficDataResponse(BaseModel):
|
class TrafficDataResponse(BaseModel):
|
||||||
date: datetime
|
"""Schema for API traffic data responses"""
|
||||||
traffic_volume: Optional[int]
|
date: datetime = Field(..., description="Date and time of traffic measurement")
|
||||||
pedestrian_count: Optional[int]
|
traffic_volume: Optional[int] = Field(None, ge=0, description="Vehicles per hour")
|
||||||
congestion_level: Optional[str]
|
pedestrian_count: Optional[int] = Field(None, ge=0, description="Pedestrians per hour")
|
||||||
average_speed: Optional[float]
|
congestion_level: Optional[str] = Field(None, pattern="^(low|medium|high)$", description="Traffic congestion level")
|
||||||
source: str
|
average_speed: Optional[float] = Field(None, ge=0, le=200, description="Average speed in km/h")
|
||||||
|
source: str = Field(..., description="Data source")
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
json_encoders = {
|
||||||
|
datetime: lambda v: v.isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
class LocationRequest(BaseModel):
|
class LocationRequest(BaseModel):
|
||||||
latitude: float
|
latitude: float
|
||||||
|
|||||||
Reference in New Issue
Block a user