# ================================================================ # services/data/app/external/aemet.py - FIXED VERSION # ================================================================ """AEMET (Spanish Weather Service) API client - FIXED FORECAST PARSING""" import math from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import structlog from app.external.base_client import BaseAPIClient from app.core.config import settings logger = structlog.get_logger() class AEMETClient(BaseAPIClient): def __init__(self): super().__init__( base_url="https://opendata.aemet.es/opendata/api", api_key=settings.AEMET_API_KEY ) async def get_current_weather(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: """Get current weather for coordinates""" try: # Find nearest station station_id = await self._get_nearest_station(latitude, longitude) if not station_id: logger.warning("No weather station found", lat=latitude, lon=longitude) return await self._generate_synthetic_weather() # AEMET API STEP 1: Get the datos URL endpoint = f"/observacion/convencional/datos/estacion/{station_id}" initial_response = await self._get(endpoint) # CRITICAL FIX: Handle AEMET's two-step API response if not initial_response or not isinstance(initial_response, dict): logger.info("Invalid initial response from AEMET API", response_type=type(initial_response)) return await self._generate_synthetic_weather() # Check if we got a successful response with datos URL datos_url = initial_response.get("datos") if not datos_url or not isinstance(datos_url, str): logger.info("No datos URL in AEMET response", response=initial_response) return await self._generate_synthetic_weather() # AEMET API STEP 2: Fetch actual data from the datos URL actual_weather_data = await self._fetch_from_url(datos_url) if actual_weather_data and isinstance(actual_weather_data, list) and len(actual_weather_data) > 0: # Parse the first station's data weather_data = actual_weather_data[0] if isinstance(weather_data, dict): return self._parse_weather_data(weather_data) # Fallback to synthetic data logger.info("Falling back to synthetic weather data", reason="invalid_weather_data") return await self._generate_synthetic_weather() except Exception as e: logger.error("Failed to get current weather", error=str(e)) return await self._generate_synthetic_weather() async def get_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[Dict[str, Any]]: """Get weather forecast for coordinates""" try: # Get municipality code for location municipality_code = await self._get_municipality_code(latitude, longitude) if not municipality_code: logger.info("No municipality code found, using synthetic data") return await self._generate_synthetic_forecast(days) # AEMET API STEP 1: Get the datos URL endpoint = f"/prediccion/especifica/municipio/diaria/{municipality_code}" initial_response = await self._get(endpoint) # CRITICAL FIX: Handle AEMET's two-step API response if not initial_response or not isinstance(initial_response, dict): logger.info("Invalid initial response from AEMET forecast API", response_type=type(initial_response)) return await self._generate_synthetic_forecast(days) # Check if we got a successful response with datos URL datos_url = initial_response.get("datos") if not datos_url or not isinstance(datos_url, str): logger.info("No datos URL in AEMET forecast response", response=initial_response) return await self._generate_synthetic_forecast(days) # AEMET API STEP 2: Fetch actual data from the datos URL actual_forecast_data = await self._fetch_from_url(datos_url) if actual_forecast_data and isinstance(actual_forecast_data, list): return self._parse_forecast_data(actual_forecast_data, days) # Fallback to synthetic data logger.info("Falling back to synthetic forecast data", reason="invalid_forecast_data") return await self._generate_synthetic_forecast(days) except Exception as e: logger.error("Failed to get weather forecast", error=str(e)) return await self._generate_synthetic_forecast(days) async def _fetch_from_url(self, url: str) -> Optional[List[Dict[str, Any]]]: """Fetch data from AEMET datos URL""" try: # Use base client to fetch from the provided URL directly data = await self._fetch_url_directly(url) if data and isinstance(data, list): return data else: logger.warning("Expected list from datos URL", data_type=type(data)) return None except Exception as e: logger.error("Failed to fetch from datos URL", url=url, error=str(e)) return None async def get_historical_weather(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Get historical weather data""" try: # For now, generate synthetic historical data # In production, this would use AEMET historical data API with proper two-step flow return await self._generate_synthetic_historical(start_date, end_date) except Exception as e: logger.error("Failed to get historical weather", error=str(e)) return [] async def _get_nearest_station(self, latitude: float, longitude: float) -> Optional[str]: """Find nearest weather station""" try: # Madrid area stations (simplified) madrid_stations = { "3195": {"lat": 40.4168, "lon": -3.7038, "name": "Madrid Centro"}, "3196": {"lat": 40.4518, "lon": -3.7246, "name": "Madrid Norte"}, "3197": {"lat": 40.3833, "lon": -3.7167, "name": "Madrid Sur"} } closest_station = None min_distance = float('inf') for station_id, station_data in madrid_stations.items(): distance = self._calculate_distance( latitude, longitude, station_data["lat"], station_data["lon"] ) if distance < min_distance: min_distance = distance closest_station = station_id return closest_station except Exception as e: logger.error("Failed to find nearest station", error=str(e)) return None async def _get_municipality_code(self, latitude: float, longitude: float) -> Optional[str]: """Get municipality code for coordinates""" # Madrid municipality code if self._is_in_madrid_area(latitude, longitude): return "28079" # Madrid municipality code return None def _is_in_madrid_area(self, latitude: float, longitude: float) -> bool: """Check if coordinates are in Madrid area""" # Madrid approximate bounds return (40.3 <= latitude <= 40.6) and (-3.9 <= longitude <= -3.5) def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in km""" R = 6371 # Earth's radius in km dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance = R * c return distance def _parse_weather_data(self, data: Dict) -> Dict[str, Any]: """Parse AEMET weather data format""" if not isinstance(data, dict): logger.warning("Weather data is not a dictionary", data_type=type(data)) return self._get_default_weather_data() try: return { "date": datetime.now(), "temperature": self._safe_float(data.get("ta"), 15.0), # Temperature "precipitation": self._safe_float(data.get("prec"), 0.0), # Precipitation "humidity": self._safe_float(data.get("hr"), 50.0), # Humidity "wind_speed": self._safe_float(data.get("vv"), 10.0), # Wind speed "pressure": self._safe_float(data.get("pres"), 1013.0), # Pressure "description": str(data.get("descripcion", "Partly cloudy")), "source": "aemet" } except Exception as e: logger.error("Error parsing weather data", error=str(e), data=data) return self._get_default_weather_data() def _parse_forecast_data(self, data: List, days: int) -> List[Dict[str, Any]]: """Parse AEMET forecast data - FIXED VERSION""" forecast = [] base_date = datetime.now().date() if not isinstance(data, list): logger.warning("Forecast data is not a list", data_type=type(data)) return [] try: # AEMET forecast structure is complex - parse what we can and fill gaps with synthetic data logger.debug("Processing AEMET forecast data", data_length=len(data)) # If we have actual AEMET data, try to parse it if len(data) > 0 and isinstance(data[0], dict): aemet_data = data[0] logger.debug("AEMET forecast keys", keys=list(aemet_data.keys()) if isinstance(aemet_data, dict) else "not_dict") # Try to extract daily forecasts from AEMET structure dias = aemet_data.get('prediccion', {}).get('dia', []) if isinstance(aemet_data, dict) else [] if isinstance(dias, list) and len(dias) > 0: # Parse AEMET daily forecast format for i, dia in enumerate(dias[:days]): if not isinstance(dia, dict): continue forecast_date = base_date + timedelta(days=i) # Extract temperature data (AEMET has complex temp structure) temp_data = dia.get('temperatura', {}) if isinstance(temp_data, dict): temp_max = self._extract_temp_value(temp_data.get('maxima')) temp_min = self._extract_temp_value(temp_data.get('minima')) avg_temp = (temp_max + temp_min) / 2 if temp_max and temp_min else 15.0 else: avg_temp = 15.0 # Extract precipitation probability precip_data = dia.get('probPrecipitacion', []) precip_prob = 0.0 if isinstance(precip_data, list) and len(precip_data) > 0: for precip_item in precip_data: if isinstance(precip_item, dict) and 'value' in precip_item: precip_prob = max(precip_prob, self._safe_float(precip_item.get('value'), 0.0)) # Extract wind data viento_data = dia.get('viento', []) wind_speed = 10.0 if isinstance(viento_data, list) and len(viento_data) > 0: for viento_item in viento_data: if isinstance(viento_item, dict) and 'velocidad' in viento_item: speed_values = viento_item.get('velocidad', []) if isinstance(speed_values, list) and len(speed_values) > 0: wind_speed = self._safe_float(speed_values[0], 10.0) break # Generate description based on precipitation probability if precip_prob > 70: description = "Lluvioso" elif precip_prob > 30: description = "Parcialmente nublado" else: description = "Soleado" forecast.append({ "forecast_date": datetime.combine(forecast_date, datetime.min.time()), "generated_at": datetime.now(), "temperature": round(avg_temp, 1), "precipitation": precip_prob / 10, # Convert percentage to mm estimate "humidity": 50.0 + (i % 20), # Estimate "wind_speed": round(wind_speed, 1), "description": description, "source": "aemet" }) logger.debug("Parsed forecast day", day=i, temp=avg_temp, precip=precip_prob) # If we successfully parsed some days, fill remaining with synthetic remaining_days = days - len(forecast) if remaining_days > 0: synthetic_forecast = self._generate_synthetic_forecast_sync(remaining_days, len(forecast)) forecast.extend(synthetic_forecast) # If no valid AEMET data was parsed, use synthetic if len(forecast) == 0: logger.info("No valid AEMET forecast data found, using synthetic") forecast = self._generate_synthetic_forecast_sync(days, 0) except Exception as e: logger.error("Error parsing AEMET forecast data", error=str(e)) # Fallback to synthetic forecast forecast = self._generate_synthetic_forecast_sync(days, 0) # Ensure we always return the requested number of days if len(forecast) < days: remaining = days - len(forecast) synthetic_remaining = self._generate_synthetic_forecast_sync(remaining, len(forecast)) forecast.extend(synthetic_remaining) return forecast[:days] # Ensure we don't exceed requested days def _extract_temp_value(self, temp_data) -> Optional[float]: """Extract temperature value from AEMET complex temperature structure""" if temp_data is None: return None if isinstance(temp_data, (int, float)): return float(temp_data) if isinstance(temp_data, str): try: return float(temp_data) except ValueError: return None if isinstance(temp_data, dict) and 'valor' in temp_data: return self._safe_float(temp_data['valor'], None) if isinstance(temp_data, list) and len(temp_data) > 0: first_item = temp_data[0] if isinstance(first_item, dict) and 'valor' in first_item: return self._safe_float(first_item['valor'], None) return None def _safe_float(self, value: Any, default: float) -> float: """Safely convert value to float with fallback""" try: if value is None: return default return float(value) except (ValueError, TypeError): return default def _get_default_weather_data(self) -> Dict[str, Any]: """Get default weather data structure""" return { "date": datetime.now(), "temperature": 15.0, "precipitation": 0.0, "humidity": 50.0, "wind_speed": 10.0, "pressure": 1013.0, "description": "Data not available", "source": "default" } async def _generate_synthetic_weather(self) -> Dict[str, Any]: """Generate realistic synthetic weather for Madrid""" now = datetime.now() month = now.month hour = now.hour # Madrid climate simulation base_temp = 5 + (month - 1) * 2.5 # Seasonal variation temp_variation = math.sin((hour - 6) * math.pi / 12) * 8 # Daily variation temperature = base_temp + temp_variation # Rain probability (higher in winter) rain_prob = 0.3 if month in [11, 12, 1, 2, 3] else 0.1 precipitation = 2.5 if hash(now.date()) % 100 < rain_prob * 100 else 0.0 return { "date": now, "temperature": round(temperature, 1), "precipitation": precipitation, "humidity": 45 + (month % 6) * 5, "wind_speed": 8 + (hour % 12), "pressure": 1013 + math.sin(now.day * 0.2) * 15, "description": "Lluvioso" if precipitation > 0 else "Soleado", "source": "synthetic" } def _generate_synthetic_forecast_sync(self, days: int, start_offset: int = 0) -> List[Dict[str, Any]]: """Generate synthetic forecast data synchronously""" forecast = [] base_date = datetime.now().date() for i in range(days): forecast_date = base_date + timedelta(days=start_offset + i) # Seasonal temperature month = forecast_date.month base_temp = 5 + (month - 1) * 2.5 temp_variation = ((start_offset + i) % 7 - 3) * 2 # Weekly variation forecast.append({ "forecast_date": datetime.combine(forecast_date, datetime.min.time()), "generated_at": datetime.now(), "temperature": round(base_temp + temp_variation, 1), "precipitation": 2.0 if (start_offset + i) % 5 == 0 else 0.0, "humidity": 50 + ((start_offset + i) % 30), "wind_speed": 10 + ((start_offset + i) % 15), "description": "Lluvioso" if (start_offset + i) % 5 == 0 else "Soleado", "source": "synthetic" }) return forecast async def _generate_synthetic_forecast(self, days: int) -> List[Dict[str, Any]]: """Generate synthetic forecast data (async version for compatibility)""" return self._generate_synthetic_forecast_sync(days, 0) async def _generate_synthetic_historical(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Generate synthetic historical weather data""" historical_data = [] current_date = start_date while current_date <= end_date: month = current_date.month base_temp = 5 + (month - 1) * 2.5 # Add some randomness based on date temp_variation = math.sin(current_date.day * 0.3) * 5 historical_data.append({ "date": current_date, "temperature": round(base_temp + temp_variation, 1), "precipitation": 1.5 if current_date.day % 7 == 0 else 0.0, "humidity": 45 + (current_date.day % 40), "wind_speed": 8 + (current_date.day % 20), "pressure": 1013 + math.sin(current_date.day * 0.2) * 20, "description": "Variable", "source": "synthetic" }) current_date += timedelta(days=1) return historical_data