Files
bakery-ia/services/data/app/external/aemet.py

704 lines
29 KiB
Python
Raw Normal View History

2025-07-18 11:51:43 +02:00
# ================================================================
2025-07-24 16:07:58 +02:00
# services/data/app/external/aemet.py - REFACTORED VERSION
2025-07-18 11:51:43 +02:00
# ================================================================
2025-07-24 16:07:58 +02:00
"""AEMET (Spanish Weather Service) API client with improved modularity"""
2025-07-18 11:51:43 +02:00
import math
2025-07-24 16:07:58 +02:00
from typing import List, Dict, Any, Optional, Tuple
2025-07-18 11:51:43 +02:00
from datetime import datetime, timedelta
2025-07-24 16:07:58 +02:00
from dataclasses import dataclass
from enum import Enum
2025-07-18 11:51:43 +02:00
import structlog
from app.external.base_client import BaseAPIClient
from app.core.config import settings
logger = structlog.get_logger()
2025-07-24 16:07:58 +02:00
class WeatherSource(Enum):
"""Weather data source types"""
AEMET = "aemet"
SYNTHETIC = "synthetic"
DEFAULT = "default"
@dataclass
class WeatherStation:
"""Weather station data"""
id: str
name: str
latitude: float
longitude: float
@dataclass
class GeographicBounds:
"""Geographic boundary definition"""
min_lat: float
max_lat: float
min_lon: float
max_lon: float
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def contains(self, latitude: float, longitude: float) -> bool:
"""Check if coordinates are within bounds"""
return (self.min_lat <= latitude <= self.max_lat and
self.min_lon <= longitude <= self.max_lon)
class AEMETConstants:
"""AEMET API constants and configuration"""
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# API Configuration
MAX_DAYS_PER_REQUEST = 30
MADRID_MUNICIPALITY_CODE = "28079"
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Madrid geographic bounds
MADRID_BOUNDS = GeographicBounds(
min_lat=40.3, max_lat=40.6,
min_lon=-3.9, max_lon=-3.5
)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Weather stations in Madrid area
MADRID_STATIONS = [
WeatherStation("3195", "Madrid Centro", 40.4117, -3.6780),
WeatherStation("3129", "Madrid Norte", 40.4677, -3.5552),
WeatherStation("3197", "Madrid Sur", 40.2987, -3.7216),
]
2025-07-18 19:16:45 +02:00
2025-07-24 16:07:58 +02:00
# Climate simulation parameters
BASE_TEMPERATURE_SEASONAL = 5.0
TEMPERATURE_SEASONAL_MULTIPLIER = 2.5
DAILY_TEMPERATURE_AMPLITUDE = 8.0
EARTH_RADIUS_KM = 6371.0
class WeatherDataParser:
"""Handles parsing of different weather data formats"""
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
@staticmethod
def safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
"""Safely convert value to float with fallback"""
2025-07-18 11:51:43 +02:00
try:
2025-07-24 16:07:58 +02:00
if value is None:
return default
return float(value)
except (ValueError, TypeError):
return default
@staticmethod
def extract_temperature_value(temp_data: Any) -> Optional[float]:
"""Extract temperature value from AEMET complex temperature structure"""
if temp_data is None:
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, (int, float)):
return float(temp_data)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, str):
try:
return float(temp_data)
except ValueError:
return None
if isinstance(temp_data, dict) and 'valor' in temp_data:
return WeatherDataParser.safe_float(temp_data['valor'])
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, list) and len(temp_data) > 0:
first_item = temp_data[0]
if isinstance(first_item, dict) and 'valor' in first_item:
return WeatherDataParser.safe_float(first_item['valor'])
2025-07-18 11:51:43 +02:00
return None
2025-07-24 16:07:58 +02:00
@staticmethod
def generate_weather_description(temperature: Optional[float],
precipitation: Optional[float],
humidity: Optional[float]) -> str:
"""Generate weather description based on conditions"""
if precipitation and precipitation > 5.0:
return "Lluvioso"
elif precipitation and precipitation > 0.1:
return "Nuboso con lluvia"
elif humidity and humidity > 80:
return "Nuboso"
elif temperature and temperature > 25:
return "Soleado y cálido"
elif temperature and temperature < 5:
return "Frío"
else:
return "Variable"
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def parse_current_weather(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse AEMET current weather data format"""
2025-07-18 19:16:45 +02:00
if not isinstance(data, dict):
logger.warning("Weather data is not a dictionary", data_type=type(data))
return self._get_default_weather_data()
try:
return {
"date": datetime.now(),
2025-07-24 16:07:58 +02:00
"temperature": self.safe_float(data.get("ta"), 15.0),
"precipitation": self.safe_float(data.get("prec"), 0.0),
"humidity": self.safe_float(data.get("hr"), 50.0),
"wind_speed": self.safe_float(data.get("vv"), 10.0),
"pressure": self.safe_float(data.get("pres"), 1013.0),
2025-07-18 19:16:45 +02:00
"description": str(data.get("descripcion", "Partly cloudy")),
2025-07-24 16:07:58 +02:00
"source": WeatherSource.AEMET.value
2025-07-18 19:16:45 +02:00
}
except Exception as e:
logger.error("Error parsing weather data", error=str(e), data=data)
return self._get_default_weather_data()
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def parse_historical_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse AEMET historical weather data"""
parsed_data = []
try:
for record in data:
if not isinstance(record, dict):
continue
parsed_record = self._parse_single_historical_record(record)
if parsed_record:
parsed_data.append(parsed_record)
except Exception as e:
logger.error("Error parsing historical weather data", error=str(e))
return parsed_data
def parse_forecast_data(self, data: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Parse AEMET forecast data"""
2025-07-18 11:51:43 +02:00
forecast = []
base_date = datetime.now().date()
2025-07-18 19:16:45 +02:00
if not isinstance(data, list):
logger.warning("Forecast data is not a list", data_type=type(data))
return []
try:
2025-07-18 19:55:57 +02:00
if len(data) > 0 and isinstance(data[0], dict):
aemet_data = data[0]
2025-07-24 16:07:58 +02:00
dias = aemet_data.get('prediccion', {}).get('dia', [])
2025-07-18 19:16:45 +02:00
2025-07-18 19:55:57 +02:00
if isinstance(dias, list) and len(dias) > 0:
2025-07-24 16:07:58 +02:00
forecast = self._parse_forecast_days(dias, days, base_date)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
# Fill remaining days with synthetic data if needed
forecast = self._ensure_forecast_completeness(forecast, days)
2025-07-18 19:16:45 +02:00
except Exception as e:
2025-07-18 19:55:57 +02:00
logger.error("Error parsing AEMET forecast data", error=str(e))
2025-07-24 16:07:58 +02:00
forecast = []
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
return forecast
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
def _parse_single_historical_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse a single historical weather record"""
fecha_str = record.get('fecha')
if not fecha_str:
2025-07-18 19:55:57 +02:00
return None
2025-07-24 16:07:58 +02:00
try:
record_date = datetime.strptime(fecha_str, '%Y-%m-%d')
except ValueError:
logger.warning("Invalid date format in historical data", fecha=fecha_str)
return None
# Extract and calculate temperature
temp_max = self.safe_float(record.get('tmax'))
temp_min = self.safe_float(record.get('tmin'))
temperature = self._calculate_average_temperature(temp_max, temp_min)
# Extract other weather parameters
precipitation = self.safe_float(record.get('prec'), 0.0)
humidity = self.safe_float(record.get('hr'))
wind_speed = self.safe_float(record.get('velmedia'))
pressure = self._extract_pressure(record)
return {
"date": record_date,
"temperature": temperature,
"precipitation": precipitation,
"humidity": humidity,
"wind_speed": wind_speed,
"pressure": pressure,
"description": self.generate_weather_description(temperature, precipitation, humidity),
"source": WeatherSource.AEMET.value
}
def _calculate_average_temperature(self, temp_max: Optional[float], temp_min: Optional[float]) -> Optional[float]:
"""Calculate average temperature from max and min values"""
if temp_max and temp_min:
return (temp_max + temp_min) / 2
elif temp_max:
return temp_max - 5 # Estimate average from max
elif temp_min:
return temp_min + 5 # Estimate average from min
return None
def _extract_pressure(self, record: Dict[str, Any]) -> Optional[float]:
"""Extract pressure from historical record"""
pressure = self.safe_float(record.get('presMax'))
if not pressure:
pressure = self.safe_float(record.get('presMin'))
return pressure
def _parse_forecast_days(self, dias: List[Dict[str, Any]], days: int, base_date: datetime.date) -> List[Dict[str, Any]]:
"""Parse forecast days from AEMET data"""
forecast = []
for i, dia in enumerate(dias[:days]):
if not isinstance(dia, dict):
continue
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
forecast_date = base_date + timedelta(days=i)
forecast_day = self._parse_single_forecast_day(dia, forecast_date, i)
forecast.append(forecast_day)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
return forecast
def _parse_single_forecast_day(self, dia: Dict[str, Any], forecast_date: datetime.date, day_index: int) -> Dict[str, Any]:
"""Parse a single forecast day"""
# Extract temperature
temp_data = dia.get('temperatura', {})
avg_temp = self._extract_forecast_temperature(temp_data)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
# Extract precipitation probability
precip_prob = self._extract_precipitation_probability(dia.get('probPrecipitacion', []))
# Extract wind speed
wind_speed = self._extract_wind_speed(dia.get('viento', []))
# Generate description
description = self._generate_forecast_description(precip_prob)
return {
"forecast_date": datetime.combine(forecast_date, datetime.min.time()),
"generated_at": datetime.now(),
"temperature": round(avg_temp, 1),
"precipitation": precip_prob / 10, # Convert percentage to mm estimate
"humidity": 50.0 + (day_index % 20), # Estimate
"wind_speed": round(wind_speed, 1),
"description": description,
"source": WeatherSource.AEMET.value
}
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def _extract_forecast_temperature(self, temp_data: Dict[str, Any]) -> float:
"""Extract temperature from forecast temperature data"""
if isinstance(temp_data, dict):
temp_max = self.extract_temperature_value(temp_data.get('maxima'))
temp_min = self.extract_temperature_value(temp_data.get('minima'))
if temp_max and temp_min:
return (temp_max + temp_min) / 2
return 15.0
def _extract_precipitation_probability(self, precip_data: List[Dict[str, Any]]) -> float:
"""Extract precipitation probability from forecast data"""
precip_prob = 0.0
if isinstance(precip_data, list):
for precip_item in precip_data:
if isinstance(precip_item, dict) and 'value' in precip_item:
precip_prob = max(precip_prob, self.safe_float(precip_item.get('value'), 0.0))
return precip_prob
def _extract_wind_speed(self, viento_data: List[Dict[str, Any]]) -> float:
"""Extract wind speed from forecast data"""
wind_speed = 10.0
if isinstance(viento_data, list):
for viento_item in viento_data:
if isinstance(viento_item, dict) and 'velocidad' in viento_item:
speed_values = viento_item.get('velocidad', [])
if isinstance(speed_values, list) and len(speed_values) > 0:
wind_speed = self.safe_float(speed_values[0], 10.0)
break
return wind_speed
def _generate_forecast_description(self, precip_prob: float) -> str:
"""Generate description based on precipitation probability"""
if precip_prob > 70:
return "Lluvioso"
elif precip_prob > 30:
return "Parcialmente nublado"
else:
return "Soleado"
def _ensure_forecast_completeness(self, forecast: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Ensure forecast has the requested number of days"""
if len(forecast) < days:
remaining_days = days - len(forecast)
synthetic_generator = SyntheticWeatherGenerator()
synthetic_forecast = synthetic_generator.generate_forecast_sync(remaining_days, len(forecast))
forecast.extend(synthetic_forecast)
return forecast[:days]
2025-07-18 19:16:45 +02:00
def _get_default_weather_data(self) -> Dict[str, Any]:
"""Get default weather data structure"""
return {
"date": datetime.now(),
"temperature": 15.0,
"precipitation": 0.0,
"humidity": 50.0,
"wind_speed": 10.0,
"pressure": 1013.0,
"description": "Data not available",
2025-07-24 16:07:58 +02:00
"source": WeatherSource.DEFAULT.value
2025-07-18 19:16:45 +02:00
}
2025-07-24 16:07:58 +02:00
class SyntheticWeatherGenerator:
"""Generates realistic synthetic weather data for Madrid"""
2025-07-18 19:16:45 +02:00
2025-07-24 16:07:58 +02:00
def generate_current_weather(self) -> Dict[str, Any]:
"""Generate realistic synthetic current weather for Madrid"""
2025-07-18 11:51:43 +02:00
now = datetime.now()
month = now.month
hour = now.hour
# Madrid climate simulation
2025-07-24 16:07:58 +02:00
temperature = self._calculate_current_temperature(month, hour)
precipitation = self._calculate_current_precipitation(now, month)
2025-07-18 11:51:43 +02:00
return {
"date": now,
"temperature": round(temperature, 1),
"precipitation": precipitation,
"humidity": 45 + (month % 6) * 5,
"wind_speed": 8 + (hour % 12),
"pressure": 1013 + math.sin(now.day * 0.2) * 15,
"description": "Lluvioso" if precipitation > 0 else "Soleado",
2025-07-24 16:07:58 +02:00
"source": WeatherSource.SYNTHETIC.value
2025-07-18 11:51:43 +02:00
}
2025-07-24 16:07:58 +02:00
def generate_forecast_sync(self, days: int, start_offset: int = 0) -> List[Dict[str, Any]]:
2025-07-18 19:55:57 +02:00
"""Generate synthetic forecast data synchronously"""
2025-07-18 11:51:43 +02:00
forecast = []
base_date = datetime.now().date()
for i in range(days):
2025-07-18 19:55:57 +02:00
forecast_date = base_date + timedelta(days=start_offset + i)
2025-07-24 16:07:58 +02:00
forecast_day = self._generate_forecast_day(forecast_date, start_offset + i)
forecast.append(forecast_day)
2025-07-18 11:51:43 +02:00
return forecast
2025-07-24 16:07:58 +02:00
async def generate_forecast(self, days: int) -> List[Dict[str, Any]]:
2025-07-18 19:55:57 +02:00
"""Generate synthetic forecast data (async version for compatibility)"""
2025-07-24 16:07:58 +02:00
return self.generate_forecast_sync(days, 0)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
def generate_historical_data(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
2025-07-18 11:51:43 +02:00
"""Generate synthetic historical weather data"""
historical_data = []
current_date = start_date
while current_date <= end_date:
2025-07-24 16:07:58 +02:00
historical_day = self._generate_historical_day(current_date)
historical_data.append(historical_day)
current_date += timedelta(days=1)
return historical_data
def _calculate_current_temperature(self, month: int, hour: int) -> float:
"""Calculate current temperature based on seasonal and daily patterns"""
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = math.sin((hour - 6) * math.pi / 12) * AEMETConstants.DAILY_TEMPERATURE_AMPLITUDE
return base_temp + temp_variation
def _calculate_current_precipitation(self, now: datetime, month: int) -> float:
"""Calculate current precipitation based on seasonal patterns"""
rain_prob = 0.3 if month in [11, 12, 1, 2, 3] else 0.1
return 2.5 if hash(now.date()) % 100 < rain_prob * 100 else 0.0
def _generate_forecast_day(self, forecast_date: datetime.date, day_offset: int) -> Dict[str, Any]:
"""Generate a single forecast day"""
month = forecast_date.month
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = ((day_offset) % 7 - 3) * 2 # Weekly variation
return {
"forecast_date": datetime.combine(forecast_date, datetime.min.time()),
"generated_at": datetime.now(),
"temperature": round(base_temp + temp_variation, 1),
"precipitation": 2.0 if day_offset % 5 == 0 else 0.0,
"humidity": 50 + (day_offset % 30),
"wind_speed": 10 + (day_offset % 15),
"description": "Lluvioso" if day_offset % 5 == 0 else "Soleado",
"source": WeatherSource.SYNTHETIC.value
}
def _generate_historical_day(self, date: datetime) -> Dict[str, Any]:
"""Generate a single historical day"""
month = date.month
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = math.sin(date.day * 0.3) * 5
return {
"date": date,
"temperature": round(base_temp + temp_variation, 1),
"precipitation": 1.5 if date.day % 7 == 0 else 0.0,
"humidity": 45 + (date.day % 40),
"wind_speed": 8 + (date.day % 20),
"pressure": 1013 + math.sin(date.day * 0.2) * 20,
"description": "Variable",
"source": WeatherSource.SYNTHETIC.value
}
class LocationService:
"""Handles location-related operations"""
@staticmethod
def find_nearest_station(latitude: float, longitude: float) -> Optional[str]:
"""Find nearest weather station to given coordinates"""
try:
# Check if coordinates are reasonable (not extreme values)
if not (-90 <= latitude <= 90 and -180 <= longitude <= 180):
logger.warning("Invalid coordinate range", lat=latitude, lon=longitude)
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Check if coordinates are too far from Madrid area (more than 1000km away)
madrid_center = (40.4168, -3.7038)
distance_to_madrid = LocationService.calculate_distance(
latitude, longitude, madrid_center[0], madrid_center[1]
)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if distance_to_madrid > 1000: # More than 1000km from Madrid
logger.warning("Coordinates too far from Madrid",
lat=latitude, lon=longitude, distance_km=distance_to_madrid)
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
closest_station = None
min_distance = float('inf')
for station in AEMETConstants.MADRID_STATIONS:
distance = LocationService.calculate_distance(
latitude, longitude, station.latitude, station.longitude
)
if distance < min_distance:
min_distance = distance
closest_station = station.id
return closest_station
except Exception as e:
logger.error("Failed to find nearest station", error=str(e))
return None
@staticmethod
def get_municipality_code(latitude: float, longitude: float) -> Optional[str]:
"""Get municipality code for coordinates"""
if AEMETConstants.MADRID_BOUNDS.contains(latitude, longitude):
return AEMETConstants.MADRID_MUNICIPALITY_CODE
return None
@staticmethod
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates using Haversine formula"""
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return AEMETConstants.EARTH_RADIUS_KM * c
class AEMETClient(BaseAPIClient):
"""AEMET (Spanish Weather Service) API client with improved modularity"""
def __init__(self):
super().__init__(
base_url="https://opendata.aemet.es/opendata/api",
api_key=settings.AEMET_API_KEY
)
self.parser = WeatherDataParser()
self.synthetic_generator = SyntheticWeatherGenerator()
self.location_service = LocationService()
async def get_current_weather(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current weather for coordinates"""
try:
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
logger.warning("No weather station found", lat=latitude, lon=longitude)
return await self._get_synthetic_current_weather()
weather_data = await self._fetch_current_weather_data(station_id)
if weather_data:
return self.parser.parse_current_weather(weather_data)
logger.info("Falling back to synthetic weather data", reason="invalid_weather_data")
return await self._get_synthetic_current_weather()
except Exception as e:
logger.error("Failed to get current weather", error=str(e))
return await self._get_synthetic_current_weather()
async def get_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[Dict[str, Any]]:
"""Get weather forecast for coordinates"""
try:
municipality_code = self.location_service.get_municipality_code(latitude, longitude)
if not municipality_code:
logger.info("No municipality code found, using synthetic data")
return await self.synthetic_generator.generate_forecast(days)
forecast_data = await self._fetch_forecast_data(municipality_code)
if forecast_data:
parsed_forecast = self.parser.parse_forecast_data(forecast_data, days)
if parsed_forecast:
return parsed_forecast
logger.info("Falling back to synthetic forecast data", reason="invalid_forecast_data")
return await self.synthetic_generator.generate_forecast(days)
except Exception as e:
logger.error("Failed to get weather forecast", error=str(e))
return await self.synthetic_generator.generate_forecast(days)
async def get_historical_weather(self,
latitude: float,
longitude: float,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical weather data"""
try:
logger.debug("Getting historical weather from AEMET API",
lat=latitude, lon=longitude,
start=start_date, end=end_date)
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
logger.warning("No weather station found for historical data",
lat=latitude, lon=longitude)
return self.synthetic_generator.generate_historical_data(start_date, end_date)
historical_data = await self._fetch_historical_data_in_chunks(
station_id, start_date, end_date
)
if historical_data:
logger.debug("Successfully fetched historical weather data",
total_count=len(historical_data))
return historical_data
else:
logger.info("No real historical data available, using synthetic data")
return self.synthetic_generator.generate_historical_data(start_date, end_date)
except Exception as e:
logger.error("Failed to get historical weather from AEMET API", error=str(e))
return self.synthetic_generator.generate_historical_data(start_date, end_date)
async def _fetch_current_weather_data(self, station_id: str) -> Optional[Dict[str, Any]]:
"""Fetch current weather data from AEMET API"""
endpoint = f"/observacion/convencional/datos/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
return None
datos_url = initial_response.get("datos")
actual_weather_data = await self._fetch_from_url(datos_url)
if (actual_weather_data and isinstance(actual_weather_data, list)
and len(actual_weather_data) > 0):
return actual_weather_data[0]
return None
async def _fetch_forecast_data(self, municipality_code: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch forecast data from AEMET API"""
endpoint = f"/prediccion/especifica/municipio/diaria/{municipality_code}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
return None
datos_url = initial_response.get("datos")
return await self._fetch_from_url(datos_url)
async def _fetch_historical_data_in_chunks(self,
station_id: str,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch historical data in chunks due to AEMET API limitations"""
historical_data = []
current_date = start_date
while current_date <= end_date:
chunk_end_date = min(
current_date + timedelta(days=AEMETConstants.MAX_DAYS_PER_REQUEST),
end_date
)
chunk_data = await self._fetch_historical_chunk(
station_id, current_date, chunk_end_date
)
if chunk_data:
historical_data.extend(chunk_data)
current_date = chunk_end_date + timedelta(days=1)
return historical_data
async def _fetch_historical_chunk(self,
station_id: str,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch a single chunk of historical data"""
start_str = start_date.strftime("%Y-%m-%dT00:00:00UTC")
end_str = end_date.strftime("%Y-%m-%dT23:59:59UTC")
endpoint = f"/valores/climatologicos/diarios/datos/fechaini/{start_str}/fechafin/{end_str}/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
logger.warning("Invalid initial response from AEMET historical API",
start=start_str, end=end_str)
return []
datos_url = initial_response.get("datos")
if not datos_url:
logger.warning("No datos URL in AEMET historical response",
start=start_str, end=end_str)
return []
actual_historical_data = await self._fetch_from_url(datos_url)
if actual_historical_data and isinstance(actual_historical_data, list):
chunk_data = self.parser.parse_historical_data(actual_historical_data)
logger.debug("Fetched historical data chunk",
count=len(chunk_data), start=start_str, end=end_str)
return chunk_data
else:
logger.warning("No valid historical data received for chunk",
start=start_str, end=end_str)
return []
async def _fetch_from_url(self, url: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch data from AEMET datos URL"""
try:
data = await self._fetch_url_directly(url)
if data and isinstance(data, list):
return data
else:
logger.warning("Expected list from datos URL", data_type=type(data))
return None
except Exception as e:
logger.error("Failed to fetch from datos URL", url=url, error=str(e))
return None
def _is_valid_initial_response(self, response: Any) -> bool:
"""Check if initial AEMET API response is valid"""
return (response and isinstance(response, dict) and
response.get("datos") and isinstance(response.get("datos"), str))
async def _get_synthetic_current_weather(self) -> Dict[str, Any]:
"""Get synthetic current weather data"""
return self.synthetic_generator.generate_current_weather()