diff --git a/services/external/app/api/traffic.py b/services/external/app/api/traffic.py index 13861401..d86ccb1d 100644 --- a/services/external/app/api/traffic.py +++ b/services/external/app/api/traffic.py @@ -55,8 +55,8 @@ async def get_current_traffic( # Continue processing - event publishing failure shouldn't break the API logger.debug("Successfully returning traffic data", - volume=traffic.traffic_volume, - congestion=traffic.congestion_level) + volume=traffic.get('traffic_volume') if isinstance(traffic, dict) else getattr(traffic, 'traffic_volume', None), + congestion=traffic.get('congestion_level') if isinstance(traffic, dict) else getattr(traffic, 'congestion_level', None)) return traffic except HTTPException: diff --git a/services/external/app/api/weather.py b/services/external/app/api/weather.py index 6597d577..0a20f11f 100644 --- a/services/external/app/api/weather.py +++ b/services/external/app/api/weather.py @@ -155,3 +155,48 @@ async def get_weather_forecast( except Exception as e: logger.error("Failed to get weather forecast", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +@router.get("/weather/status") +async def get_weather_status(): + """Get AEMET API status and diagnostics""" + try: + from app.core.config import settings + + # Test AEMET API connectivity + aemet_status = "unknown" + aemet_message = "Not tested" + + try: + # Quick test of AEMET API + test_weather = await weather_service.get_current_weather(40.4168, -3.7038) + if test_weather and hasattr(test_weather, 'source') and test_weather.source == "aemet": + aemet_status = "healthy" + aemet_message = "AEMET API responding correctly" + elif test_weather and hasattr(test_weather, 'source') and test_weather.source == "synthetic": + aemet_status = "degraded" + aemet_message = "AEMET API unavailable - using synthetic data" + else: + aemet_status = "degraded" + aemet_message = "Weather service returned unexpected data format" + except Exception as e: + aemet_status = "unhealthy" + aemet_message = f"AEMET API error: {str(e)}" + + return { + "status": "ok", + "aemet": { + "status": aemet_status, + "message": aemet_message, + "api_key_configured": bool(settings.AEMET_API_KEY), + "enabled": getattr(settings, 'AEMET_ENABLED', True), + "base_url": settings.AEMET_BASE_URL, + "timeout": settings.AEMET_TIMEOUT, # Now correctly shows 60 from config + "retry_attempts": settings.AEMET_RETRY_ATTEMPTS + }, + "timestamp": datetime.utcnow().isoformat(), + "service": "external-weather-service" + } + + except Exception as e: + logger.error("Failed to get weather status", error=str(e)) + raise HTTPException(status_code=500, detail=f"Status check failed: {str(e)}") diff --git a/services/external/app/core/config.py b/services/external/app/core/config.py index 1fbc4456..b317d468 100644 --- a/services/external/app/core/config.py +++ b/services/external/app/core/config.py @@ -25,8 +25,9 @@ class DataSettings(BaseServiceSettings): # External API Configuration AEMET_API_KEY: str = os.getenv("AEMET_API_KEY", "") AEMET_BASE_URL: str = "https://opendata.aemet.es/opendata" - AEMET_TIMEOUT: int = int(os.getenv("AEMET_TIMEOUT", "30")) + AEMET_TIMEOUT: int = int(os.getenv("AEMET_TIMEOUT", "60")) # Increased default AEMET_RETRY_ATTEMPTS: int = int(os.getenv("AEMET_RETRY_ATTEMPTS", "3")) + AEMET_ENABLED: bool = os.getenv("AEMET_ENABLED", "true").lower() == "true" # Allow disabling AEMET MADRID_OPENDATA_API_KEY: str = os.getenv("MADRID_OPENDATA_API_KEY", "") MADRID_OPENDATA_BASE_URL: str = "https://datos.madrid.es" diff --git a/services/external/app/external/aemet.py b/services/external/app/external/aemet.py index ba15be99..dd6b2755 100644 --- a/services/external/app/external/aemet.py +++ b/services/external/app/external/aemet.py @@ -515,6 +515,10 @@ class AEMETClient(BaseAPIClient): base_url="https://opendata.aemet.es/opendata/api", api_key=settings.AEMET_API_KEY ) + # Override timeout with settings value + import httpx + self.timeout = httpx.Timeout(float(settings.AEMET_TIMEOUT)) + self.retries = settings.AEMET_RETRY_ATTEMPTS self.parser = WeatherDataParser() self.synthetic_generator = SyntheticWeatherGenerator() self.location_service = LocationService() @@ -541,15 +545,18 @@ class AEMETClient(BaseAPIClient): weather_data = await self._fetch_current_weather_data(station_id) if weather_data: - logger.info("✅ Successfully fetched AEMET weather data", station_id=station_id) + logger.info("🎉 SUCCESS: Real AEMET weather data retrieved!", station_id=station_id) parsed_data = self.parser.parse_current_weather(weather_data) # Ensure the source is set to AEMET for successful API calls if parsed_data and isinstance(parsed_data, dict): parsed_data["source"] = WeatherSource.AEMET.value + logger.info("📡 AEMET data confirmed - source set to 'aemet'", + temperature=parsed_data.get("temperature"), + description=parsed_data.get("description")) return parsed_data - logger.warning("❌ AEMET API returned no data - falling back to synthetic", - station_id=station_id, reason="invalid_weather_data") + logger.warning("⚠️ AEMET API connectivity issues - using synthetic data", + station_id=station_id, reason="aemet_api_unreachable") return await self._get_synthetic_current_weather() except Exception as e: diff --git a/services/external/app/external/apis/madrid_traffic_client.py b/services/external/app/external/apis/madrid_traffic_client.py index 0f1bc039..e938cfa4 100644 --- a/services/external/app/external/apis/madrid_traffic_client.py +++ b/services/external/app/external/apis/madrid_traffic_client.py @@ -205,18 +205,30 @@ class MadridTrafficClient(BaseTrafficClient, BaseAPIClient): traffic_record, location_context ) - # Build enhanced response + # Calculate average speed based on congestion level + if congestion_level == 'high': + average_speed = 15.0 + elif congestion_level == 'medium': + average_speed = 35.0 + elif congestion_level == 'low': + average_speed = 50.0 + else: + average_speed = 30.0 # default + + # Build enhanced response with required API fields enhanced_data = { + 'date': datetime.now(timezone.utc), # Required API field 'timestamp': datetime.now(timezone.utc), 'latitude': traffic_point.get('latitude'), 'longitude': traffic_point.get('longitude'), 'measurement_point_id': traffic_point.get('measurement_point_id'), 'measurement_point_name': traffic_point.get('measurement_point_name'), 'traffic_volume': traffic_point.get('intensidad', 0), + 'pedestrian_count': pedestrian_count, + 'congestion_level': congestion_level, + 'average_speed': average_speed, # Required API field 'occupation_percentage': int(traffic_point.get('ocupacion', 0)), 'load_percentage': traffic_point.get('carga', 0), - 'congestion_level': congestion_level, - 'pedestrian_count': pedestrian_count, 'road_type': road_type, 'distance_km': distance_km, 'source': 'madrid_current_xml', diff --git a/services/external/app/external/base_client.py b/services/external/app/external/base_client.py index 23f3057b..8907f120 100644 --- a/services/external/app/external/base_client.py +++ b/services/external/app/external/base_client.py @@ -15,44 +15,61 @@ class BaseAPIClient: def __init__(self, base_url: str, api_key: Optional[str] = None): self.base_url = base_url self.api_key = api_key - self.timeout = httpx.Timeout(30.0) + # Increase timeout and add connection retries for unstable AEMET API + self.timeout = httpx.Timeout(60.0) # Increased from 30s + self.retries = 3 async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]: - """Make GET request""" - try: - url = f"{self.base_url}{endpoint}" - - # Add API key to params for AEMET (not headers) - request_params = params or {} - if self.api_key: - request_params["api_key"] = self.api_key - - # Add headers if provided - request_headers = headers or {} - - logger.debug("Making API request", url=url, params=request_params) - - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.get(url, params=request_params, headers=request_headers) - response.raise_for_status() - - # Log response for debugging - response_data = response.json() - logger.debug("API response received", - status_code=response.status_code, - response_keys=list(response_data.keys()) if isinstance(response_data, dict) else "non-dict") - - return response_data - - except httpx.HTTPStatusError as e: - logger.error("HTTP error", status_code=e.response.status_code, url=url, response_text=e.response.text[:200]) - return None - except httpx.RequestError as e: - logger.error("Request error", error=str(e), url=url) - return None - except Exception as e: - logger.error("Unexpected error", error=str(e), url=url) - return None + """Make GET request with retry logic for unstable APIs""" + url = f"{self.base_url}{endpoint}" + + # Add API key to params for AEMET (not headers) + request_params = params or {} + if self.api_key: + request_params["api_key"] = self.api_key + + # Add headers if provided + request_headers = headers or {} + + logger.debug("Making API request", url=url, params=request_params) + + # Retry logic for unstable AEMET API + for attempt in range(self.retries): + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get(url, params=request_params, headers=request_headers) + response.raise_for_status() + + # Log response for debugging + response_data = response.json() + logger.debug("API response received", + status_code=response.status_code, + response_keys=list(response_data.keys()) if isinstance(response_data, dict) else "non-dict", + attempt=attempt + 1) + + return response_data + + except httpx.HTTPStatusError as e: + logger.error("HTTP error", status_code=e.response.status_code, url=url, + response_text=e.response.text[:200], attempt=attempt + 1) + if attempt == self.retries - 1: # Last attempt + return None + except httpx.RequestError as e: + logger.error("Request error", error=str(e), url=url, attempt=attempt + 1) + if attempt == self.retries - 1: # Last attempt + return None + + # Wait before retry (exponential backoff) + import asyncio + wait_time = 2 ** attempt # 1s, 2s, 4s + logger.info(f"Retrying AEMET API in {wait_time}s", attempt=attempt + 1, max_attempts=self.retries) + await asyncio.sleep(wait_time) + except Exception as e: + logger.error("Unexpected error", error=str(e), url=url, attempt=attempt + 1) + if attempt == self.retries - 1: # Last attempt + return None + + return None async def _fetch_url_directly(self, url: str, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]: """Fetch data directly from a full URL (for AEMET datos URLs)""" diff --git a/services/external/app/external/processors/madrid_processor.py b/services/external/app/external/processors/madrid_processor.py index d64907bb..b6fb8854 100644 --- a/services/external/app/external/processors/madrid_processor.py +++ b/services/external/app/external/processors/madrid_processor.py @@ -25,9 +25,8 @@ class MadridTrafficDataProcessor: def __init__(self): self.logger = structlog.get_logger() - # UTM Zone 30N (Madrid's coordinate system) - self.utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', datum='WGS84') - self.wgs84_proj = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') + # UTM Zone 30N (Madrid's coordinate system) - using modern pyproj API + self.transformer = pyproj.Transformer.from_crs("EPSG:25830", "EPSG:4326", always_xy=True) def safe_int(self, value: str) -> int: """Safely convert string to int""" @@ -68,8 +67,8 @@ class MadridTrafficDataProcessor: utm_x_float = float(utm_x.replace(',', '.')) utm_y_float = float(utm_y.replace(',', '.')) - # Convert from UTM Zone 30N to WGS84 - longitude, latitude = pyproj.transform(self.utm_proj, self.wgs84_proj, utm_x_float, utm_y_float) + # Convert from UTM Zone 30N to WGS84 using modern pyproj API + longitude, latitude = self.transformer.transform(utm_x_float, utm_y_float) # Validate coordinates are in Madrid area if 40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5: @@ -455,9 +454,25 @@ class MadridTrafficDataProcessor: carga = self.safe_int(row.get('carga', '0')) vmed = self.safe_int(row.get('vmed', '0')) - # Build basic result (business logic will be applied elsewhere) + # Calculate average speed (vmed is in km/h, use it if available) + average_speed = float(vmed) if vmed > 0 else 30.0 # Default speed + + # Determine congestion level based on occupation percentage + if ocupacion > 75: + congestion_level = 'high' + elif ocupacion > 40: + congestion_level = 'medium' + else: + congestion_level = 'low' + + # Build result with API-compatible fields result = { - 'date': date_obj, + 'date': date_obj, # Required API field + 'traffic_volume': intensidad, # Required API field + 'pedestrian_count': max(1, int(intensidad * 0.1)), # Estimated pedestrian count + 'congestion_level': congestion_level, # Required API field + 'average_speed': average_speed, # Required API field + 'source': 'madrid_historical_csv', # Required API field 'measurement_point_id': measurement_point_id, 'point_data': point_data, 'distance_km': distance_km, diff --git a/services/external/app/schemas/traffic.py b/services/external/app/schemas/traffic.py index eece1f60..c29113e5 100644 --- a/services/external/app/schemas/traffic.py +++ b/services/external/app/schemas/traffic.py @@ -31,8 +31,8 @@ class TrafficDataUpdate(BaseModel): average_speed: Optional[float] = Field(None, ge=0, le=200) raw_data: Optional[str] = None -class TrafficDataResponse(TrafficDataBase): - """Schema for traffic data responses""" +class TrafficDataResponseDB(TrafficDataBase): + """Schema for traffic data responses from database""" id: str = Field(..., description="Unique identifier") created_at: datetime = Field(..., description="Creation timestamp") updated_at: datetime = Field(..., description="Last update timestamp") @@ -52,7 +52,7 @@ class TrafficDataResponse(TrafficDataBase): class TrafficDataList(BaseModel): """Schema for paginated traffic data responses""" - data: List[TrafficDataResponse] + data: List[TrafficDataResponseDB] total: int = Field(..., description="Total number of records") page: int = Field(..., description="Current page number") per_page: int = Field(..., description="Records per page") @@ -72,12 +72,18 @@ class TrafficAnalytics(BaseModel): avg_speed: Optional[float] = None class TrafficDataResponse(BaseModel): - date: datetime - traffic_volume: Optional[int] - pedestrian_count: Optional[int] - congestion_level: Optional[str] - average_speed: Optional[float] - source: str + """Schema for API traffic data responses""" + date: datetime = Field(..., description="Date and time of traffic measurement") + traffic_volume: Optional[int] = Field(None, ge=0, description="Vehicles per hour") + pedestrian_count: Optional[int] = Field(None, ge=0, description="Pedestrians per hour") + congestion_level: Optional[str] = Field(None, pattern="^(low|medium|high)$", description="Traffic congestion level") + average_speed: Optional[float] = Field(None, ge=0, le=200, description="Average speed in km/h") + source: str = Field(..., description="Data source") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } class LocationRequest(BaseModel): latitude: float