Files
bakery-ia/services/external/app/external/aemet.py

1005 lines
42 KiB
Python
Raw Normal View History

2025-07-18 11:51:43 +02:00
# ================================================================
2025-07-24 16:07:58 +02:00
# services/data/app/external/aemet.py - REFACTORED VERSION
2025-07-18 11:51:43 +02:00
# ================================================================
2025-07-24 16:07:58 +02:00
"""AEMET (Spanish Weather Service) API client with improved modularity"""
2025-07-18 11:51:43 +02:00
import math
2025-07-24 16:07:58 +02:00
from typing import List, Dict, Any, Optional, Tuple
2025-07-18 11:51:43 +02:00
from datetime import datetime, timedelta
2025-07-24 16:07:58 +02:00
from dataclasses import dataclass
from enum import Enum
2025-07-18 11:51:43 +02:00
import structlog
from app.external.base_client import BaseAPIClient
from app.core.config import settings
logger = structlog.get_logger()
2025-07-24 16:07:58 +02:00
class WeatherSource(Enum):
"""Weather data source types"""
AEMET = "aemet"
SYNTHETIC = "synthetic"
DEFAULT = "default"
@dataclass
class WeatherStation:
"""Weather station data"""
id: str
name: str
latitude: float
longitude: float
@dataclass
class GeographicBounds:
"""Geographic boundary definition"""
min_lat: float
max_lat: float
min_lon: float
max_lon: float
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def contains(self, latitude: float, longitude: float) -> bool:
"""Check if coordinates are within bounds"""
return (self.min_lat <= latitude <= self.max_lat and
self.min_lon <= longitude <= self.max_lon)
class AEMETConstants:
"""AEMET API constants and configuration"""
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# API Configuration
MAX_DAYS_PER_REQUEST = 30
MADRID_MUNICIPALITY_CODE = "28079"
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Madrid geographic bounds
MADRID_BOUNDS = GeographicBounds(
min_lat=40.3, max_lat=40.6,
min_lon=-3.9, max_lon=-3.5
)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Weather stations in Madrid area
MADRID_STATIONS = [
WeatherStation("3195", "Madrid Centro", 40.4117, -3.6780),
WeatherStation("3129", "Madrid Norte", 40.4677, -3.5552),
WeatherStation("3197", "Madrid Sur", 40.2987, -3.7216),
]
2025-07-18 19:16:45 +02:00
2025-07-24 16:07:58 +02:00
# Climate simulation parameters
BASE_TEMPERATURE_SEASONAL = 5.0
TEMPERATURE_SEASONAL_MULTIPLIER = 2.5
DAILY_TEMPERATURE_AMPLITUDE = 8.0
EARTH_RADIUS_KM = 6371.0
class WeatherDataParser:
"""Handles parsing of different weather data formats"""
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
@staticmethod
def safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
"""Safely convert value to float with fallback"""
2025-07-18 11:51:43 +02:00
try:
2025-07-24 16:07:58 +02:00
if value is None:
return default
return float(value)
except (ValueError, TypeError):
return default
@staticmethod
def extract_temperature_value(temp_data: Any) -> Optional[float]:
"""Extract temperature value from AEMET complex temperature structure"""
if temp_data is None:
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, (int, float)):
return float(temp_data)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, str):
try:
return float(temp_data)
except ValueError:
return None
if isinstance(temp_data, dict) and 'valor' in temp_data:
return WeatherDataParser.safe_float(temp_data['valor'])
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if isinstance(temp_data, list) and len(temp_data) > 0:
first_item = temp_data[0]
if isinstance(first_item, dict) and 'valor' in first_item:
return WeatherDataParser.safe_float(first_item['valor'])
2025-07-18 11:51:43 +02:00
return None
2025-07-24 16:07:58 +02:00
@staticmethod
def generate_weather_description(temperature: Optional[float],
precipitation: Optional[float],
humidity: Optional[float]) -> str:
"""Generate weather description based on conditions"""
if precipitation and precipitation > 5.0:
return "Lluvioso"
elif precipitation and precipitation > 0.1:
return "Nuboso con lluvia"
elif humidity and humidity > 80:
return "Nuboso"
elif temperature and temperature > 25:
return "Soleado y cálido"
elif temperature and temperature < 5:
return "Frío"
else:
return "Variable"
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def parse_current_weather(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse AEMET current weather data format"""
2025-07-18 19:16:45 +02:00
if not isinstance(data, dict):
logger.warning("Weather data is not a dictionary", data_type=type(data))
return self._get_default_weather_data()
try:
2025-08-18 21:11:26 +02:00
# Log the raw data to understand the structure
logger.debug("Parsing current weather data", data_keys=list(data.keys()) if isinstance(data, dict) else "not_dict")
# Enhanced parsing for AEMET station data
result = {
2025-07-18 19:16:45 +02:00
"date": datetime.now(),
2025-08-18 21:11:26 +02:00
"temperature": self.safe_float(data.get("ta"), self.safe_float(data.get("temperature"))),
"precipitation": self.safe_float(data.get("prec"), self.safe_float(data.get("precipitation"), 0.0)),
"humidity": self.safe_float(data.get("hr"), self.safe_float(data.get("humidity"))),
"wind_speed": self.safe_float(data.get("vv"), self.safe_float(data.get("wind_speed"))),
"pressure": self.safe_float(data.get("pres"), self.safe_float(data.get("pressure"))),
"description": str(data.get("descripcion", data.get("description", "Partly cloudy"))),
2025-07-24 16:07:58 +02:00
"source": WeatherSource.AEMET.value
2025-07-18 19:16:45 +02:00
}
2025-08-18 21:11:26 +02:00
# Set fallback values for required fields that are None
if result["temperature"] is None:
result["temperature"] = 15.0
if result["humidity"] is None:
result["humidity"] = 50.0
if result["wind_speed"] is None:
result["wind_speed"] = 10.0
if result["pressure"] is None:
result["pressure"] = 1013.0
logger.debug("Parsed current weather successfully",
temperature=result["temperature"],
source=result["source"])
return result
2025-07-18 19:16:45 +02:00
except Exception as e:
logger.error("Error parsing weather data", error=str(e), data=data)
return self._get_default_weather_data()
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def parse_historical_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse AEMET historical weather data"""
parsed_data = []
try:
for record in data:
if not isinstance(record, dict):
continue
parsed_record = self._parse_single_historical_record(record)
if parsed_record:
parsed_data.append(parsed_record)
except Exception as e:
logger.error("Error parsing historical weather data", error=str(e))
return parsed_data
def parse_forecast_data(self, data: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Parse AEMET forecast data"""
2025-07-18 11:51:43 +02:00
forecast = []
base_date = datetime.now().date()
2025-07-18 19:16:45 +02:00
if not isinstance(data, list):
logger.warning("Forecast data is not a list", data_type=type(data))
return []
try:
2025-07-18 19:55:57 +02:00
if len(data) > 0 and isinstance(data[0], dict):
aemet_data = data[0]
2025-07-24 16:07:58 +02:00
dias = aemet_data.get('prediccion', {}).get('dia', [])
2025-07-18 19:16:45 +02:00
2025-07-18 19:55:57 +02:00
if isinstance(dias, list) and len(dias) > 0:
2025-07-24 16:07:58 +02:00
forecast = self._parse_forecast_days(dias, days, base_date)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
# Fill remaining days with synthetic data if needed
forecast = self._ensure_forecast_completeness(forecast, days)
2025-07-18 19:16:45 +02:00
except Exception as e:
2025-07-18 19:55:57 +02:00
logger.error("Error parsing AEMET forecast data", error=str(e))
2025-07-24 16:07:58 +02:00
forecast = []
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
return forecast
2025-07-18 19:55:57 +02:00
2025-08-18 20:50:41 +02:00
def parse_hourly_forecast_data(self, data: List[Dict[str, Any]], hours: int) -> List[Dict[str, Any]]:
"""Parse AEMET hourly forecast data"""
hourly_forecast = []
base_datetime = datetime.now()
if not isinstance(data, list):
logger.warning("Hourly forecast data is not a list", data_type=type(data))
return []
try:
2025-08-18 21:11:26 +02:00
# AEMET hourly forecast has different structure than daily
logger.debug("Processing AEMET hourly forecast data", data_length=len(data))
2025-08-18 20:50:41 +02:00
if len(data) > 0 and isinstance(data[0], dict):
aemet_data = data[0]
prediccion = aemet_data.get('prediccion', {})
2025-08-18 21:11:26 +02:00
# AEMET hourly structure: prediccion -> dia[] -> each day has hourly arrays
2025-08-18 20:50:41 +02:00
dias = prediccion.get('dia', [])
if isinstance(dias, list):
2025-08-18 21:11:26 +02:00
hourly_forecast = self._parse_real_hourly_forecast_days(dias, hours, base_datetime)
logger.info("Successfully parsed AEMET hourly forecast", count=len(hourly_forecast))
if len(hourly_forecast) == 0:
logger.warning("No hourly data parsed from AEMET response")
2025-08-18 20:50:41 +02:00
except Exception as e:
logger.error("Error parsing AEMET hourly forecast data", error=str(e))
hourly_forecast = []
2025-08-18 21:11:26 +02:00
return hourly_forecast[:hours]
2025-08-18 20:50:41 +02:00
2025-07-24 16:07:58 +02:00
def _parse_single_historical_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse a single historical weather record"""
fecha_str = record.get('fecha')
if not fecha_str:
2025-07-18 19:55:57 +02:00
return None
2025-07-24 16:07:58 +02:00
try:
record_date = datetime.strptime(fecha_str, '%Y-%m-%d')
except ValueError:
logger.warning("Invalid date format in historical data", fecha=fecha_str)
return None
# Extract and calculate temperature
temp_max = self.safe_float(record.get('tmax'))
temp_min = self.safe_float(record.get('tmin'))
temperature = self._calculate_average_temperature(temp_max, temp_min)
# Extract other weather parameters
precipitation = self.safe_float(record.get('prec'), 0.0)
humidity = self.safe_float(record.get('hr'))
wind_speed = self.safe_float(record.get('velmedia'))
pressure = self._extract_pressure(record)
return {
"date": record_date,
"temperature": temperature,
"precipitation": precipitation,
"humidity": humidity,
"wind_speed": wind_speed,
"pressure": pressure,
"description": self.generate_weather_description(temperature, precipitation, humidity),
"source": WeatherSource.AEMET.value
}
def _calculate_average_temperature(self, temp_max: Optional[float], temp_min: Optional[float]) -> Optional[float]:
"""Calculate average temperature from max and min values"""
if temp_max and temp_min:
return (temp_max + temp_min) / 2
elif temp_max:
return temp_max - 5 # Estimate average from max
elif temp_min:
return temp_min + 5 # Estimate average from min
return None
def _extract_pressure(self, record: Dict[str, Any]) -> Optional[float]:
"""Extract pressure from historical record"""
pressure = self.safe_float(record.get('presMax'))
if not pressure:
pressure = self.safe_float(record.get('presMin'))
return pressure
def _parse_forecast_days(self, dias: List[Dict[str, Any]], days: int, base_date: datetime.date) -> List[Dict[str, Any]]:
"""Parse forecast days from AEMET data"""
forecast = []
for i, dia in enumerate(dias[:days]):
if not isinstance(dia, dict):
continue
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
forecast_date = base_date + timedelta(days=i)
forecast_day = self._parse_single_forecast_day(dia, forecast_date, i)
forecast.append(forecast_day)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
return forecast
def _parse_single_forecast_day(self, dia: Dict[str, Any], forecast_date: datetime.date, day_index: int) -> Dict[str, Any]:
"""Parse a single forecast day"""
# Extract temperature
temp_data = dia.get('temperatura', {})
avg_temp = self._extract_forecast_temperature(temp_data)
2025-07-18 19:55:57 +02:00
2025-07-24 16:07:58 +02:00
# Extract precipitation probability
precip_prob = self._extract_precipitation_probability(dia.get('probPrecipitacion', []))
# Extract wind speed
wind_speed = self._extract_wind_speed(dia.get('viento', []))
# Generate description
description = self._generate_forecast_description(precip_prob)
return {
"forecast_date": datetime.combine(forecast_date, datetime.min.time()),
"generated_at": datetime.now(),
"temperature": round(avg_temp, 1),
"precipitation": precip_prob / 10, # Convert percentage to mm estimate
"humidity": 50.0 + (day_index % 20), # Estimate
"wind_speed": round(wind_speed, 1),
"description": description,
"source": WeatherSource.AEMET.value
}
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
def _extract_forecast_temperature(self, temp_data: Dict[str, Any]) -> float:
"""Extract temperature from forecast temperature data"""
if isinstance(temp_data, dict):
temp_max = self.extract_temperature_value(temp_data.get('maxima'))
temp_min = self.extract_temperature_value(temp_data.get('minima'))
if temp_max and temp_min:
return (temp_max + temp_min) / 2
return 15.0
def _extract_precipitation_probability(self, precip_data: List[Dict[str, Any]]) -> float:
"""Extract precipitation probability from forecast data"""
precip_prob = 0.0
if isinstance(precip_data, list):
for precip_item in precip_data:
if isinstance(precip_item, dict) and 'value' in precip_item:
precip_prob = max(precip_prob, self.safe_float(precip_item.get('value'), 0.0))
return precip_prob
def _extract_wind_speed(self, viento_data: List[Dict[str, Any]]) -> float:
"""Extract wind speed from forecast data"""
wind_speed = 10.0
if isinstance(viento_data, list):
for viento_item in viento_data:
if isinstance(viento_item, dict) and 'velocidad' in viento_item:
speed_values = viento_item.get('velocidad', [])
if isinstance(speed_values, list) and len(speed_values) > 0:
wind_speed = self.safe_float(speed_values[0], 10.0)
break
return wind_speed
def _generate_forecast_description(self, precip_prob: float) -> str:
"""Generate description based on precipitation probability"""
if precip_prob > 70:
return "Lluvioso"
elif precip_prob > 30:
return "Parcialmente nublado"
else:
return "Soleado"
2025-08-18 21:11:26 +02:00
def _parse_real_hourly_forecast_days(self, dias: List[Dict[str, Any]], hours: int, base_datetime: datetime) -> List[Dict[str, Any]]:
"""Parse real AEMET hourly forecast days"""
hourly_forecast = []
current_hour = 0
for day_index, day_data in enumerate(dias):
if current_hour >= hours:
break
if not isinstance(day_data, dict):
continue
# Parse hourly data from this day using real AEMET structure
day_hourly = self._parse_real_aemet_hourly_day(day_data, base_datetime, day_index, current_hour, hours)
hourly_forecast.extend(day_hourly)
current_hour += len(day_hourly)
return hourly_forecast[:hours]
def _parse_real_aemet_hourly_day(self, day_data: Dict[str, Any], base_datetime: datetime, day_index: int, start_hour: int, max_hours: int) -> List[Dict[str, Any]]:
"""Parse hourly data for a single day from real AEMET hourly API response"""
hourly_data = []
# Extract fecha (date) for this day
fecha_str = day_data.get('fecha', '')
try:
if fecha_str:
day_date = datetime.strptime(fecha_str, '%Y-%m-%d')
else:
day_date = base_datetime.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=day_index)
except ValueError:
day_date = base_datetime.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=day_index)
# Extract hourly arrays from AEMET structure
temperatura = day_data.get('temperatura', [])
precipitacion = day_data.get('precipitacion', [])
humedadRelativa = day_data.get('humedadRelativa', [])
viento = day_data.get('viento', [])
# Process available hourly data
max_available_hours = min(
len(temperatura) if isinstance(temperatura, list) else 0,
len(precipitacion) if isinstance(precipitacion, list) else 0,
24, # Max 24 hours per day
max_hours - start_hour
)
if max_available_hours == 0:
logger.warning("No hourly data arrays found in day data", fecha=fecha_str)
return []
for hour_idx in range(max_available_hours):
forecast_datetime = day_date + timedelta(hours=hour_idx)
# Extract hourly values safely
temp = self._extract_aemet_hourly_value(temperatura, hour_idx)
precip = self._extract_aemet_hourly_value(precipitacion, hour_idx, default=0.0)
humidity = self._extract_aemet_hourly_value(humedadRelativa, hour_idx)
wind_data = viento[hour_idx] if isinstance(viento, list) and hour_idx < len(viento) else {}
wind_speed = self._extract_wind_from_aemet_data(wind_data)
hourly_data.append({
"forecast_datetime": forecast_datetime,
"generated_at": datetime.now(),
"temperature": temp if temp is not None else 15.0,
"precipitation": precip if precip is not None else 0.0,
"humidity": humidity if humidity is not None else 50.0,
"wind_speed": wind_speed if wind_speed is not None else 10.0,
"description": self._generate_hourly_description(temp, precip),
"source": WeatherSource.AEMET.value,
"hour": forecast_datetime.hour
})
logger.debug("Parsed AEMET hourly day", fecha=fecha_str, hours_parsed=len(hourly_data))
return hourly_data
def _extract_aemet_hourly_value(self, data_array: Any, hour_idx: int, default: Optional[float] = None) -> Optional[float]:
"""Extract hourly value from AEMET hourly data array"""
if not isinstance(data_array, list) or hour_idx >= len(data_array):
return default
value_data = data_array[hour_idx]
# Handle different AEMET data formats
if isinstance(value_data, (int, float)):
return float(value_data)
elif isinstance(value_data, dict):
# Try common AEMET field names
for field in ['value', 'valor', 'v']:
if field in value_data:
return self.safe_float(value_data[field], default)
elif isinstance(value_data, str):
return self.safe_float(value_data, default)
return default
def _extract_wind_from_aemet_data(self, wind_data: Any) -> Optional[float]:
"""Extract wind speed from AEMET wind data structure"""
if isinstance(wind_data, dict):
# Try common wind speed fields
for field in ['velocidad', 'speed', 'v']:
if field in wind_data:
velocidad = wind_data[field]
if isinstance(velocidad, list) and len(velocidad) > 0:
return self.safe_float(velocidad[0])
else:
return self.safe_float(velocidad)
elif isinstance(wind_data, (int, float)):
return float(wind_data)
return None
2025-08-18 20:50:41 +02:00
def _parse_hourly_forecast_days(self, dias: List[Dict[str, Any]], hours: int, base_datetime: datetime) -> List[Dict[str, Any]]:
2025-08-18 21:11:26 +02:00
"""Parse hourly forecast days from AEMET data (legacy method)"""
2025-08-18 20:50:41 +02:00
hourly_forecast = []
current_hour = 0
2025-07-24 16:07:58 +02:00
2025-08-18 20:50:41 +02:00
for day_data in dias:
if current_hour >= hours:
break
if not isinstance(day_data, dict):
continue
# Parse hourly data from this day
day_hourly = self._parse_single_day_hourly_forecast(day_data, base_datetime, current_hour, hours)
hourly_forecast.extend(day_hourly)
current_hour += len(day_hourly)
return hourly_forecast[:hours]
def _parse_single_day_hourly_forecast(self, day_data: Dict[str, Any], base_datetime: datetime, start_hour: int, max_hours: int) -> List[Dict[str, Any]]:
"""Parse hourly data for a single day"""
hourly_data = []
# Extract hourly temperature data
temperatura = day_data.get('temperatura', [])
if not isinstance(temperatura, list):
temperatura = []
# Extract hourly precipitation data
precipitacion = day_data.get('precipitacion', [])
if not isinstance(precipitacion, list):
precipitacion = []
# Extract hourly wind data
viento = day_data.get('viento', [])
if not isinstance(viento, list):
viento = []
# Extract hourly humidity data
humedadRelativa = day_data.get('humedadRelativa', [])
if not isinstance(humedadRelativa, list):
humedadRelativa = []
# Process up to 24 hours for this day
hours_in_day = min(24, max_hours - start_hour)
for hour in range(hours_in_day):
forecast_datetime = base_datetime + timedelta(hours=start_hour + hour)
# Extract hourly values
temp = self._extract_hourly_value(temperatura, hour)
precip = self._extract_hourly_value(precipitacion, hour)
wind = self._extract_hourly_value(viento, hour, 'velocidad')
humidity = self._extract_hourly_value(humedadRelativa, hour)
hourly_data.append({
"forecast_datetime": forecast_datetime,
"generated_at": datetime.now(),
"temperature": temp if temp is not None else 15.0 + hour * 0.5,
"precipitation": precip if precip is not None else 0.0,
"humidity": humidity if humidity is not None else 50.0 + (hour % 20),
"wind_speed": wind if wind is not None else 10.0 + (hour % 10),
"description": self._generate_hourly_description(temp, precip),
"source": WeatherSource.AEMET.value,
"hour": forecast_datetime.hour
})
return hourly_data
def _extract_hourly_value(self, data: List[Dict[str, Any]], hour: int, key: str = 'value') -> Optional[float]:
"""Extract hourly value from AEMET hourly data structure"""
if not data or hour >= len(data):
return None
hour_data = data[hour] if hour < len(data) else {}
if not isinstance(hour_data, dict):
return None
if key == 'velocidad' and 'velocidad' in hour_data:
velocidad_list = hour_data.get('velocidad', [])
if isinstance(velocidad_list, list) and len(velocidad_list) > 0:
return self.safe_float(velocidad_list[0])
return self.safe_float(hour_data.get(key))
def _generate_hourly_description(self, temp: Optional[float], precip: Optional[float]) -> str:
"""Generate description for hourly forecast"""
if precip and precip > 0.5:
return "Lluvia"
elif precip and precip > 0.1:
return "Llovizna"
elif temp and temp > 25:
return "Soleado"
elif temp and temp < 5:
return "Frío"
else:
return "Variable"
def _ensure_forecast_completeness(self, forecast: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Return forecast as is - no synthetic data filling"""
2025-07-24 16:07:58 +02:00
return forecast[:days]
2025-07-18 19:16:45 +02:00
2025-08-18 21:11:26 +02:00
2025-08-18 20:50:41 +02:00
def _ensure_hourly_forecast_completeness(self, forecast: List[Dict[str, Any]], hours: int) -> List[Dict[str, Any]]:
"""Return hourly forecast as is - no synthetic data filling"""
return forecast[:hours]
2025-07-18 19:16:45 +02:00
def _get_default_weather_data(self) -> Dict[str, Any]:
"""Get default weather data structure"""
return {
"date": datetime.now(),
"temperature": 15.0,
"precipitation": 0.0,
"humidity": 50.0,
"wind_speed": 10.0,
"pressure": 1013.0,
"description": "Data not available",
2025-07-24 16:07:58 +02:00
"source": WeatherSource.DEFAULT.value
2025-07-18 19:16:45 +02:00
}
2025-07-24 16:07:58 +02:00
class LocationService:
"""Handles location-related operations"""
@staticmethod
def find_nearest_station(latitude: float, longitude: float) -> Optional[str]:
"""Find nearest weather station to given coordinates"""
try:
# Check if coordinates are reasonable (not extreme values)
if not (-90 <= latitude <= 90 and -180 <= longitude <= 180):
logger.warning("Invalid coordinate range", lat=latitude, lon=longitude)
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
# Check if coordinates are too far from Madrid area (more than 1000km away)
madrid_center = (40.4168, -3.7038)
distance_to_madrid = LocationService.calculate_distance(
latitude, longitude, madrid_center[0], madrid_center[1]
)
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
if distance_to_madrid > 1000: # More than 1000km from Madrid
logger.warning("Coordinates too far from Madrid",
lat=latitude, lon=longitude, distance_km=distance_to_madrid)
return None
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
closest_station = None
min_distance = float('inf')
for station in AEMETConstants.MADRID_STATIONS:
distance = LocationService.calculate_distance(
latitude, longitude, station.latitude, station.longitude
)
if distance < min_distance:
min_distance = distance
closest_station = station.id
return closest_station
except Exception as e:
logger.error("Failed to find nearest station", error=str(e))
return None
@staticmethod
def get_municipality_code(latitude: float, longitude: float) -> Optional[str]:
"""Get municipality code for coordinates"""
if AEMETConstants.MADRID_BOUNDS.contains(latitude, longitude):
return AEMETConstants.MADRID_MUNICIPALITY_CODE
return None
@staticmethod
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates using Haversine formula"""
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
2025-07-18 11:51:43 +02:00
2025-07-24 16:07:58 +02:00
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return AEMETConstants.EARTH_RADIUS_KM * c
class AEMETClient(BaseAPIClient):
"""AEMET (Spanish Weather Service) API client with improved modularity"""
def __init__(self):
super().__init__(
base_url="https://opendata.aemet.es/opendata/api",
api_key=settings.AEMET_API_KEY
)
# Override timeout with settings value
import httpx
self.timeout = httpx.Timeout(float(settings.AEMET_TIMEOUT))
self.retries = settings.AEMET_RETRY_ATTEMPTS
2025-07-24 16:07:58 +02:00
self.parser = WeatherDataParser()
self.location_service = LocationService()
async def get_current_weather(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current weather for coordinates"""
try:
logger.info("🌤️ Getting current weather from AEMET",
lat=latitude, lon=longitude, api_key_configured=bool(self.api_key))
# Check if API key is configured
if not self.api_key:
2025-08-18 20:50:41 +02:00
logger.error("❌ AEMET API key not configured")
return None
2025-07-24 16:07:58 +02:00
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
logger.warning("❌ No weather station found for coordinates",
lat=latitude, lon=longitude,
madrid_bounds=f"{AEMETConstants.MADRID_BOUNDS.min_lat}-{AEMETConstants.MADRID_BOUNDS.max_lat}")
2025-08-18 20:50:41 +02:00
return None
2025-07-24 16:07:58 +02:00
logger.info("✅ Found nearest weather station", station_id=station_id)
2025-07-24 16:07:58 +02:00
weather_data = await self._fetch_current_weather_data(station_id)
if weather_data:
logger.info("🎉 SUCCESS: Real AEMET weather data retrieved!", station_id=station_id)
parsed_data = self.parser.parse_current_weather(weather_data)
# Ensure the source is set to AEMET for successful API calls
if parsed_data and isinstance(parsed_data, dict):
parsed_data["source"] = WeatherSource.AEMET.value
logger.info("📡 AEMET data confirmed - source set to 'aemet'",
temperature=parsed_data.get("temperature"),
description=parsed_data.get("description"))
return parsed_data
2025-07-24 16:07:58 +02:00
2025-08-18 20:50:41 +02:00
logger.warning("⚠️ AEMET API connectivity issues",
station_id=station_id, reason="aemet_api_unreachable")
2025-08-18 20:50:41 +02:00
return None
2025-07-24 16:07:58 +02:00
except Exception as e:
2025-08-18 20:50:41 +02:00
logger.error("❌ AEMET API failed",
error=str(e), error_type=type(e).__name__)
2025-08-18 20:50:41 +02:00
return None
2025-07-24 16:07:58 +02:00
async def get_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[Dict[str, Any]]:
"""Get weather forecast for coordinates"""
try:
municipality_code = self.location_service.get_municipality_code(latitude, longitude)
if not municipality_code:
2025-08-18 20:50:41 +02:00
logger.error("No municipality code found for coordinates", lat=latitude, lon=longitude)
return []
2025-07-24 16:07:58 +02:00
forecast_data = await self._fetch_forecast_data(municipality_code)
if forecast_data:
parsed_forecast = self.parser.parse_forecast_data(forecast_data, days)
if parsed_forecast:
return parsed_forecast
2025-08-18 20:50:41 +02:00
logger.error("Invalid forecast data received from AEMET API")
return []
2025-07-24 16:07:58 +02:00
except Exception as e:
logger.error("Failed to get weather forecast", error=str(e))
2025-08-18 20:50:41 +02:00
return []
async def get_hourly_forecast(self, latitude: float, longitude: float, hours: int = 48) -> List[Dict[str, Any]]:
"""Get hourly weather forecast using AEMET's hourly prediction API"""
try:
logger.info("🕒 Getting hourly forecast from AEMET",
lat=latitude, lon=longitude, hours=hours)
# Check if API key is configured
if not self.api_key:
logger.error("❌ AEMET API key not configured")
return []
municipality_code = self.location_service.get_municipality_code(latitude, longitude)
if not municipality_code:
logger.error("❌ No municipality code found for coordinates",
lat=latitude, lon=longitude)
return []
logger.info("✅ Found municipality code", municipality_code=municipality_code)
2025-08-18 21:11:26 +02:00
# Get real hourly data from AEMET API
2025-08-18 20:50:41 +02:00
hourly_data = await self._fetch_hourly_forecast_data(municipality_code)
if hourly_data:
logger.info("🎉 SUCCESS: Real AEMET hourly data retrieved!", municipality_code=municipality_code)
parsed_data = self.parser.parse_hourly_forecast_data(hourly_data, hours)
2025-08-18 21:11:26 +02:00
if parsed_data and len(parsed_data) > 0:
2025-08-18 20:50:41 +02:00
return parsed_data
2025-08-18 21:11:26 +02:00
logger.error("⚠️ AEMET hourly API failed or returned no data",
municipality_code=municipality_code)
2025-08-18 20:50:41 +02:00
return []
except Exception as e:
2025-08-18 21:11:26 +02:00
logger.error("❌ AEMET hourly forecast failed",
2025-08-18 20:50:41 +02:00
error=str(e), error_type=type(e).__name__)
return []
2025-07-24 16:07:58 +02:00
async def get_historical_weather(self,
latitude: float,
longitude: float,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical weather data"""
try:
logger.debug("Getting historical weather from AEMET API",
lat=latitude, lon=longitude,
start=start_date, end=end_date)
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
2025-08-18 20:50:41 +02:00
logger.error("No weather station found for historical data",
2025-07-24 16:07:58 +02:00
lat=latitude, lon=longitude)
2025-08-18 20:50:41 +02:00
return []
2025-07-24 16:07:58 +02:00
historical_data = await self._fetch_historical_data_in_chunks(
station_id, start_date, end_date
)
if historical_data:
logger.debug("Successfully fetched historical weather data",
total_count=len(historical_data))
return historical_data
else:
2025-08-18 20:50:41 +02:00
logger.error("No real historical data available from AEMET")
return []
2025-07-24 16:07:58 +02:00
except Exception as e:
logger.error("Failed to get historical weather from AEMET API", error=str(e))
2025-08-18 20:50:41 +02:00
return []
2025-07-24 16:07:58 +02:00
async def _fetch_current_weather_data(self, station_id: str) -> Optional[Dict[str, Any]]:
"""Fetch current weather data from AEMET API"""
endpoint = f"/observacion/convencional/datos/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
return None
datos_url = initial_response.get("datos")
actual_weather_data = await self._fetch_from_url(datos_url)
if (actual_weather_data and isinstance(actual_weather_data, list)
and len(actual_weather_data) > 0):
return actual_weather_data[0]
return None
async def _fetch_forecast_data(self, municipality_code: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch forecast data from AEMET API"""
endpoint = f"/prediccion/especifica/municipio/diaria/{municipality_code}"
initial_response = await self._get(endpoint)
# Check for AEMET error responses
if initial_response and isinstance(initial_response, dict):
aemet_estado = initial_response.get("estado")
if aemet_estado == 404 or aemet_estado == "404":
logger.warning("AEMET API returned 404 error",
mensaje=initial_response.get("descripcion"),
municipality=municipality_code)
return None
2025-07-24 16:07:58 +02:00
if not self._is_valid_initial_response(initial_response):
return None
2025-07-24 16:07:58 +02:00
datos_url = initial_response.get("datos")
return await self._fetch_from_url(datos_url)
2025-08-18 20:50:41 +02:00
async def _fetch_hourly_forecast_data(self, municipality_code: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch hourly forecast data from AEMET API"""
2025-08-18 21:11:26 +02:00
# Note: AEMET hourly forecast API endpoint
2025-08-18 20:50:41 +02:00
endpoint = f"/prediccion/especifica/municipio/horaria/{municipality_code}"
2025-08-18 21:11:26 +02:00
logger.info("Requesting AEMET hourly forecast", endpoint=endpoint, municipality=municipality_code)
2025-08-18 20:50:41 +02:00
initial_response = await self._get(endpoint)
# Check for AEMET error responses
if initial_response and isinstance(initial_response, dict):
aemet_estado = initial_response.get("estado")
if aemet_estado == 404 or aemet_estado == "404":
logger.warning("AEMET API returned 404 error for hourly forecast",
mensaje=initial_response.get("descripcion"),
municipality=municipality_code)
return None
2025-08-18 20:50:41 +02:00
if not self._is_valid_initial_response(initial_response):
logger.warning("Invalid initial response from AEMET hourly API",
2025-08-18 21:11:26 +02:00
response=initial_response, municipality=municipality_code)
2025-08-18 20:50:41 +02:00
return None
2025-08-18 20:50:41 +02:00
datos_url = initial_response.get("datos")
2025-08-18 21:11:26 +02:00
logger.info("Fetching hourly data from AEMET datos URL", url=datos_url)
2025-08-18 20:50:41 +02:00
return await self._fetch_from_url(datos_url)
async def _fetch_historical_data_in_chunks(self,
station_id: str,
start_date: datetime,
2025-07-24 16:07:58 +02:00
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch historical data in chunks due to AEMET API limitations"""
import asyncio
2025-07-24 16:07:58 +02:00
historical_data = []
current_date = start_date
chunk_count = 0
2025-07-24 16:07:58 +02:00
while current_date <= end_date:
chunk_end_date = min(
current_date + timedelta(days=AEMETConstants.MAX_DAYS_PER_REQUEST),
2025-07-24 16:07:58 +02:00
end_date
)
# Add delay to respect rate limits (AEMET allows ~60 requests/minute)
# Wait 2 seconds between requests to stay well under the limit
if chunk_count > 0:
await asyncio.sleep(2)
2025-07-24 16:07:58 +02:00
chunk_data = await self._fetch_historical_chunk(
station_id, current_date, chunk_end_date
)
2025-07-24 16:07:58 +02:00
if chunk_data:
historical_data.extend(chunk_data)
2025-07-24 16:07:58 +02:00
current_date = chunk_end_date + timedelta(days=1)
chunk_count += 1
# Log progress every 5 chunks
if chunk_count % 5 == 0:
logger.info("Historical data fetch progress",
chunks_fetched=chunk_count,
records_so_far=len(historical_data))
2025-07-24 16:07:58 +02:00
return historical_data
async def _fetch_historical_chunk(self,
station_id: str,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch a single chunk of historical data"""
start_str = start_date.strftime("%Y-%m-%dT00:00:00UTC")
end_str = end_date.strftime("%Y-%m-%dT23:59:59UTC")
endpoint = f"/valores/climatologicos/diarios/datos/fechaini/{start_str}/fechafin/{end_str}/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
logger.warning("Invalid initial response from AEMET historical API",
start=start_str, end=end_str)
return []
datos_url = initial_response.get("datos")
if not datos_url:
logger.warning("No datos URL in AEMET historical response",
start=start_str, end=end_str)
return []
actual_historical_data = await self._fetch_from_url(datos_url)
if actual_historical_data and isinstance(actual_historical_data, list):
chunk_data = self.parser.parse_historical_data(actual_historical_data)
logger.debug("Fetched historical data chunk",
count=len(chunk_data), start=start_str, end=end_str)
return chunk_data
else:
logger.warning("No valid historical data received for chunk",
start=start_str, end=end_str)
return []
async def _fetch_from_url(self, url: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch data from AEMET datos URL"""
try:
data = await self._fetch_url_directly(url)
if data is None:
logger.warning("No data received from datos URL", url=url)
2025-07-24 16:07:58 +02:00
return None
# Check if we got an AEMET error response (dict with estado/descripcion)
if isinstance(data, dict):
aemet_estado = data.get("estado")
aemet_mensaje = data.get("descripcion")
if aemet_estado or aemet_mensaje:
logger.warning("AEMET datos URL returned error response",
estado=aemet_estado,
mensaje=aemet_mensaje,
url=url)
return None
else:
# It's a dict but not an error response - unexpected format
logger.warning("Expected list from datos URL but got dict",
data_type=type(data),
keys=list(data.keys())[:5],
url=url)
return None
if isinstance(data, list):
return data
logger.warning("Unexpected data type from datos URL",
data_type=type(data), url=url)
return None
2025-07-24 16:07:58 +02:00
except Exception as e:
logger.error("Failed to fetch from datos URL", url=url, error=str(e))
return None
def _is_valid_initial_response(self, response: Any) -> bool:
"""Check if initial AEMET API response is valid"""
return (response and isinstance(response, dict) and
response.get("datos") and isinstance(response.get("datos"), str))