# ================================================================
# services/data/app/external/madrid_opendata.py - FIXED XML PARSER
# ================================================================
"""Madrid Open Data API client with fixed XML parser for actual structure"""
import math
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import structlog
import re
from app.external.base_client import BaseAPIClient
from app.core.config import settings
import pyproj
logger = structlog.get_logger()
class MadridOpenDataClient(BaseAPIClient):
def __init__(self):
super().__init__(
base_url="https://datos.madrid.es",
api_key=None
)
# WORKING Madrid traffic endpoints (verified)
self.traffic_endpoints = [
# Primary working endpoint
"https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml",
]
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current traffic data for location using working Madrid endpoints"""
try:
logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude)
# Try the working endpoint
for endpoint in self.traffic_endpoints:
try:
logger.debug("Trying traffic endpoint", endpoint=endpoint)
traffic_data = await self._fetch_traffic_xml_data(endpoint)
if traffic_data:
logger.info("Successfully fetched Madrid traffic data",
endpoint=endpoint,
points=len(traffic_data))
# Find nearest traffic measurement point
nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data)
if nearest_point:
parsed_data = self._parse_traffic_measurement(nearest_point)
logger.debug("Successfully parsed real Madrid traffic data",
point_name=nearest_point.get('descripcion'),
point_id=nearest_point.get('idelem'))
return parsed_data
else:
logger.debug("No nearby traffic points found",
lat=latitude, lon=longitude,
closest_distance=self._get_closest_distance(latitude, longitude, traffic_data))
except Exception as e:
logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e))
continue
# If no real data available, use synthetic data
logger.info("No nearby Madrid traffic points found, using synthetic data")
return await self._generate_synthetic_traffic(latitude, longitude)
except Exception as e:
logger.error("Failed to get current traffic", error=str(e))
return await self._generate_synthetic_traffic(latitude, longitude)
async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch and parse Madrid traffic XML data"""
try:
xml_content = await self._fetch_xml_content_robust(endpoint)
if not xml_content:
logger.debug("No XML content received", endpoint=endpoint)
return None
# Log XML structure for debugging
logger.debug("Madrid XML content preview",
length=len(xml_content),
first_500=xml_content[:500] if len(xml_content) > 500 else xml_content)
# Parse Madrid traffic XML with the correct structure
traffic_points = self._parse_madrid_traffic_xml(xml_content)
if traffic_points:
logger.debug("Successfully parsed Madrid traffic XML", points=len(traffic_points))
return traffic_points
else:
logger.warning("No traffic points found in XML", endpoint=endpoint)
return None
except Exception as e:
logger.error("Error fetching traffic XML data", endpoint=endpoint, error=str(e))
return None
def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]:
"""Parse Madrid traffic XML with correct structure (...)"""
traffic_points = []
try:
# Clean the XML to handle undefined entities and encoding issues
cleaned_xml = self._clean_madrid_xml(xml_content)
# Parse XML
root = ET.fromstring(cleaned_xml)
# Log XML structure
logger.debug("Madrid XML structure",
root_tag=root.tag,
children_count=len(list(root)))
# Madrid uses root with children
if root.tag == 'pms':
pm_elements = root.findall('pm')
logger.debug("Found PM elements", count=len(pm_elements))
for pm in pm_elements:
try:
traffic_point = self._extract_madrid_pm_element(pm)
# Validate essential data (coordinates and ID)
if (traffic_point.get('latitude') and
traffic_point.get('longitude') and
traffic_point.get('idelem')):
traffic_points.append(traffic_point)
# Log first few points for debugging
if len(traffic_points) <= 3:
logger.debug("Sample traffic point",
id=traffic_point['idelem'],
lat=traffic_point['latitude'],
lon=traffic_point['longitude'],
intensity=traffic_point.get('intensidad'))
except Exception as e:
logger.debug("Error parsing PM element", error=str(e))
continue
else:
logger.warning("Unexpected XML root tag", root_tag=root.tag)
logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points))
return traffic_points
except ET.ParseError as e:
logger.warning("Failed to parse Madrid XML", error=str(e))
# Try regex extraction as fallback
return self._extract_traffic_data_regex(xml_content)
except Exception as e:
logger.error("Error in Madrid traffic XML parsing", error=str(e))
return []
def _clean_madrid_xml(self, xml_content: str) -> str:
"""Clean Madrid XML to handle undefined entities and encoding issues"""
try:
# Remove BOM if present
xml_content = xml_content.lstrip('\ufeff')
# Remove or replace undefined entities that cause parsing errors
# Common undefined entities in Madrid data
xml_content = xml_content.replace(' ', ' ')
xml_content = xml_content.replace('©', '©')
xml_content = xml_content.replace('®', '®')
xml_content = xml_content.replace('™', '™')
# Fix unescaped ampersands (but not already escaped ones)
xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content)
# Remove invalid control characters
xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content)
# Handle Spanish characters that might be causing issues
spanish_chars = {
'ñ': 'n', 'Ñ': 'N',
'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U',
'ü': 'u', 'Ü': 'U'
}
for spanish_char, replacement in spanish_chars.items():
xml_content = xml_content.replace(spanish_char, replacement)
return xml_content
except Exception as e:
logger.warning("Error cleaning Madrid XML", error=str(e))
return xml_content
def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]:
"""Extract traffic data from Madrid element with proper coordinate conversion"""
try:
# Based on the actual Madrid XML structure shown in logs
point_data = {}
utm_x = None
utm_y = None
# Extract all child elements
for child in pm_element:
tag = child.tag
text = child.text.strip() if child.text else ''
if tag == 'idelem':
point_data['idelem'] = text
elif tag == 'descripcion':
point_data['descripcion'] = text
elif tag == 'intensidad':
point_data['intensidad'] = self._safe_int(text)
elif tag == 'ocupacion':
point_data['ocupacion'] = self._safe_float(text)
elif tag == 'carga':
point_data['carga'] = self._safe_int(text)
elif tag == 'nivelServicio':
point_data['nivelServicio'] = self._safe_int(text)
elif tag == 'st_x':
# Store UTM X coordinate for later conversion
utm_x = text
point_data['utm_x'] = text # Keep original for debugging
elif tag == 'st_y':
# Store UTM Y coordinate for later conversion
utm_y = text
point_data['utm_y'] = text # Keep original for debugging
elif tag == 'error':
point_data['error'] = text
elif tag == 'subarea':
point_data['subarea'] = text
elif tag == 'accesoAsociado':
point_data['accesoAsociado'] = text
elif tag == 'intensidadSat':
point_data['intensidadSat'] = self._safe_int(text)
# Convert UTM coordinates to lat/lon if both are available
if utm_x and utm_y:
latitude, longitude = self._convert_utm_coordinates_accurate(utm_x, utm_y)
if latitude is not None and longitude is not None:
# Validate that coordinates are actually in Madrid area
if self._validate_madrid_coordinates(latitude, longitude):
point_data['latitude'] = latitude
point_data['longitude'] = longitude
# Log first few successful conversions for verification
if len(getattr(self, '_conversion_log_count', [])) < 3:
if not hasattr(self, '_conversion_log_count'):
self._conversion_log_count = []
self._conversion_log_count.append(1)
logger.debug("Successful UTM conversion",
idelem=point_data.get('idelem'),
utm_x=utm_x,
utm_y=utm_y,
latitude=latitude,
longitude=longitude,
descripcion=point_data.get('descripcion'))
else:
# Log invalid coordinates for debugging
logger.debug("Invalid Madrid coordinates after conversion",
idelem=point_data.get('idelem'),
utm_x=utm_x,
utm_y=utm_y,
converted_lat=latitude,
converted_lon=longitude,
descripcion=point_data.get('descripcion'))
# Don't include this point - return empty dict
return {}
else:
# Conversion failed
logger.debug("UTM conversion failed",
idelem=point_data.get('idelem'),
utm_x=utm_x,
utm_y=utm_y)
return {}
else:
# Missing coordinates
logger.debug("Missing UTM coordinates",
idelem=point_data.get('idelem'),
has_utm_x=utm_x is not None,
has_utm_y=utm_y is not None)
return {}
return point_data
except Exception as e:
logger.debug("Error extracting Madrid PM element", error=str(e))
return {}
def _convert_utm_coordinates_accurate(self, utm_x_str: str, utm_y_str: str) -> tuple[Optional[float], Optional[float]]:
"""Convert UTM coordinates to lat/lon using accurate pyproj library"""
try:
utm_x = float(utm_x_str.replace(',', '.'))
utm_y = float(utm_y_str.replace(',', '.'))
# Define UTM Zone 30N projection (EPSG:25830)
utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', preserve_units=False)
# Convert to latitude/longitude
longitude, latitude = utm_proj(utm_x, utm_y, inverse=True)
return round(latitude, 6), round(longitude, 6)
except (ValueError, TypeError, Exception):
return None, None
def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool:
"""Validate that converted coordinates are actually in Madrid area"""
# Madrid bounds (expanded slightly to include metro area)
madrid_lat_min, madrid_lat_max = 40.31, 40.56
madrid_lon_min, madrid_lon_max = -3.89, -3.51
return (madrid_lat_min <= latitude <= madrid_lat_max and
madrid_lon_min <= longitude <= madrid_lon_max)
def _safe_int(self, value_str: str) -> int:
"""Safely convert string to int"""
try:
return int(float(value_str.replace(',', '.')))
except (ValueError, TypeError):
return 0
def _safe_float(self, value_str: str) -> float:
"""Safely convert string to float"""
try:
return float(value_str.replace(',', '.'))
except (ValueError, TypeError):
return 0.0
async def _fetch_xml_content_robust(self, url: str) -> Optional[str]:
"""Fetch XML content with robust headers for Madrid endpoints"""
try:
import httpx
# Headers optimized for Madrid Open Data
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/xml,text/xml,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Referer': 'https://datos.madrid.es/'
}
async with httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers=headers
) as client:
logger.debug("Fetching XML from Madrid endpoint", url=url)
response = await client.get(url)
logger.debug("Madrid API response",
status=response.status_code,
content_type=response.headers.get('content-type'),
content_length=len(response.content))
if response.status_code == 200:
try:
content = response.text
if content and len(content) > 100:
return content
except UnicodeDecodeError:
# Try manual encoding for Spanish content
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
try:
content = response.content.decode(encoding)
if content and len(content) > 100:
logger.debug("Successfully decoded with encoding", encoding=encoding)
return content
except UnicodeDecodeError:
continue
return None
except Exception as e:
logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e))
return None
def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]:
"""Extract traffic data using regex when XML parsing fails"""
traffic_points = []
try:
# Pattern to match Madrid PM elements
pm_pattern = r'(.*?)'
pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL)
for pm_content in pm_matches:
try:
# Extract individual fields
idelem_match = re.search(r'(.*?)', pm_content)
intensidad_match = re.search(r'(.*?)', pm_content)
st_x_match = re.search(r'(.*?)', pm_content)
st_y_match = re.search(r'(.*?)', pm_content)
descripcion_match = re.search(r'(.*?)', pm_content)
if idelem_match and st_x_match and st_y_match:
idelem = idelem_match.group(1)
st_x = st_x_match.group(1)
st_y = st_y_match.group(1)
intensidad = intensidad_match.group(1) if intensidad_match else '0'
descripcion = descripcion_match.group(1) if descripcion_match else f'Point {idelem}'
# Convert coordinates
longitude = self._convert_utm_to_lon(st_x)
latitude = self._convert_utm_to_lat(st_y)
if latitude and longitude:
traffic_point = {
'idelem': idelem,
'descripcion': descripcion,
'intensidad': self._safe_int(intensidad),
'latitude': latitude,
'longitude': longitude,
'ocupacion': 0,
'carga': 0,
'nivelServicio': 0,
'error': 'N'
}
traffic_points.append(traffic_point)
except Exception as e:
logger.debug("Error parsing regex PM match", error=str(e))
continue
logger.debug("Regex extraction results", count=len(traffic_points))
return traffic_points
except Exception as e:
logger.error("Error in regex extraction", error=str(e))
return []
def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float:
"""Get distance to closest traffic point for debugging"""
if not traffic_data:
return float('inf')
min_distance = float('inf')
for point in traffic_data:
if point.get('latitude') and point.get('longitude'):
distance = self._calculate_distance(
latitude, longitude,
point['latitude'], point['longitude']
)
min_distance = min(min_distance, distance)
return min_distance
def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]:
"""Find the nearest traffic measurement point to given coordinates"""
if not traffic_data:
return None
min_distance = float('inf')
nearest_point = None
for point in traffic_data:
if point.get('latitude') and point.get('longitude'):
distance = self._calculate_distance(
latitude, longitude,
point['latitude'], point['longitude']
)
if distance < min_distance:
min_distance = distance
nearest_point = point
# Madrid area search radius (15km)
if nearest_point and min_distance <= 15.0:
logger.debug("Found nearest Madrid traffic point",
distance_km=min_distance,
point_name=nearest_point.get('descripcion'),
point_id=nearest_point.get('idelem'))
return nearest_point
logger.debug("No nearby Madrid traffic points found",
min_distance=min_distance,
total_points=len(traffic_data))
return None
def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates in km using Haversine formula"""
R = 6371 # Earth's radius in km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
return distance
def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]:
"""Parse Madrid traffic measurement into standardized format"""
try:
# Madrid traffic service levels: 0=fluid, 1=dense, 2=congested, 3=cut
service_level_map = {
0: "low",
1: "medium",
2: "high",
3: "blocked"
}
service_level = traffic_point.get('nivelServicio', 0)
# Estimate speed based on service level and road type
if service_level == 0: # Fluid
average_speed = 45
elif service_level == 1: # Dense
average_speed = 25
elif service_level == 2: # Congested
average_speed = 15
else: # Cut/Blocked
average_speed = 5
congestion_level = service_level_map.get(service_level, "medium")
# Calculate pedestrian estimate based on location
hour = datetime.now().hour
if 13 <= hour <= 15: # Lunch time
pedestrian_multiplier = 2.5
elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
pedestrian_multiplier = 2.0
else:
pedestrian_multiplier = 1.0
pedestrian_count = int(100 * pedestrian_multiplier)
return {
"date": datetime.now(),
"traffic_volume": traffic_point.get('intensidad', 0),
"pedestrian_count": pedestrian_count,
"congestion_level": congestion_level,
"average_speed": average_speed,
"occupation_percentage": traffic_point.get('ocupacion', 0),
"load_percentage": traffic_point.get('carga', 0),
"measurement_point_id": traffic_point.get('idelem'),
"measurement_point_name": traffic_point.get('descripcion'),
"road_type": "URB",
"source": "madrid_opendata"
}
except Exception as e:
logger.error("Error parsing traffic measurement", error=str(e))
return self._get_default_traffic_data()
def _get_default_traffic_data(self) -> Dict[str, Any]:
"""Get default traffic data when parsing fails"""
return {
"date": datetime.now(),
"traffic_volume": 100,
"pedestrian_count": 150,
"congestion_level": "medium",
"average_speed": 25,
"occupation_percentage": 30,
"load_percentage": 40,
"measurement_point_id": "unknown",
"measurement_point_name": "Unknown location",
"road_type": "URB",
"source": "synthetic"
}
async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]:
"""Generate realistic Madrid traffic data as fallback"""
now = datetime.now()
hour = now.hour
is_weekend = now.weekday() >= 5
base_traffic = 100
if not is_weekend:
if 7 <= hour <= 9:
traffic_multiplier = 2.2
congestion = "high"
avg_speed = 15
elif 18 <= hour <= 20:
traffic_multiplier = 2.5
congestion = "high"
avg_speed = 12
elif 12 <= hour <= 14:
traffic_multiplier = 1.6
congestion = "medium"
avg_speed = 25
else:
traffic_multiplier = 1.0
congestion = "low"
avg_speed = 40
else:
if 11 <= hour <= 14:
traffic_multiplier = 1.4
congestion = "medium"
avg_speed = 30
else:
traffic_multiplier = 0.8
congestion = "low"
avg_speed = 45
traffic_volume = int(base_traffic * traffic_multiplier)
# Pedestrian calculation
pedestrian_base = 150
if 13 <= hour <= 15:
pedestrian_count = int(pedestrian_base * 2.5)
elif 8 <= hour <= 9 or 18 <= hour <= 20:
pedestrian_count = int(pedestrian_base * 2.0)
else:
pedestrian_count = int(pedestrian_base * 1.0)
return {
"date": now,
"traffic_volume": traffic_volume,
"pedestrian_count": pedestrian_count,
"congestion_level": congestion,
"average_speed": max(10, avg_speed),
"occupation_percentage": min(100, traffic_volume // 2),
"load_percentage": min(100, traffic_volume // 3),
"measurement_point_id": "madrid_synthetic",
"measurement_point_name": "Madrid Centro (Synthetic)",
"road_type": "URB",
"source": "synthetic"
}
# Placeholder methods for completeness
async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical traffic data"""
return []
async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic incidents and events"""
return []