Files
bakery-ia/services/data/app/external/madrid_opendata.py
2025-07-27 22:58:18 +02:00

1409 lines
61 KiB
Python

# ================================================================
# services/data/app/external/madrid_opendata.py - REFACTORED
# ================================================================
"""
Madrid Open Data API client with clean architecture and best practices
Features:
- Real-time traffic data from XML endpoints
- Historical traffic data from ZIP files
- Measurement points integration
- Robust error handling and fallbacks
- Comprehensive logging
"""
import math
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta, timezone
import structlog
import re
from dataclasses import dataclass
from enum import Enum
from app.external.base_client import BaseAPIClient
from app.core.config import settings
import pyproj
logger = structlog.get_logger()
# ================================================================
# CONSTANTS AND ENUMS
# ================================================================
class TrafficServiceLevel(Enum):
"""Madrid traffic service levels"""
FLUID = 0
DENSE = 1
CONGESTED = 2
BLOCKED = 3
class CongestionLevel(Enum):
"""Standardized congestion levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
BLOCKED = "blocked"
class DataSource(Enum):
"""Data source types"""
MADRID_REALTIME = "madrid_opendata"
MADRID_HISTORICAL = "madrid_opendata_zip"
SYNTHETIC = "synthetic"
SYNTHETIC_HISTORICAL = "synthetic_historical"
# Madrid geographic bounds
MADRID_BOUNDS = {
'lat_min': 40.31, 'lat_max': 40.56,
'lon_min': -3.89, 'lon_max': -3.51
}
# Constants
MAX_HISTORICAL_DAYS = 365
MAX_CSV_PROCESSING_ROWS = 5000000
MEASUREMENT_POINTS_LIMIT = 20
UTM_ZONE = 30 # Madrid is in UTM Zone 30N
@dataclass
class MeasurementPoint:
"""Measurement point data structure"""
id: str
latitude: float
longitude: float
distance: float
name: str
type: str
@dataclass
class TrafficRecord:
"""Traffic record data structure"""
date: datetime
traffic_volume: int
occupation_percentage: int
load_percentage: int
average_speed: int
congestion_level: str
pedestrian_count: int
measurement_point_id: str
measurement_point_name: str
road_type: str
source: str
error_status: Optional[str] = None
# Madrid-specific raw data
intensidad_raw: Optional[int] = None
ocupacion_raw: Optional[int] = None
carga_raw: Optional[int] = None
vmed_raw: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert TrafficRecord to dictionary"""
result = {
"date": self.date,
"traffic_volume": self.traffic_volume,
"occupation_percentage": self.occupation_percentage,
"load_percentage": self.load_percentage,
"average_speed": self.average_speed,
"congestion_level": self.congestion_level,
"pedestrian_count": self.pedestrian_count,
"measurement_point_id": self.measurement_point_id,
"measurement_point_name": self.measurement_point_name,
"road_type": self.road_type,
"source": self.source
}
# Add optional fields if present
optional_fields = ['error_status', 'intensidad_raw', 'ocupacion_raw', 'carga_raw', 'vmed_raw']
for field in optional_fields:
value = getattr(self, field, None)
if value is not None:
result[field] = value
return result
# ================================================================
# MADRID OPEN DATA CLIENT
# ================================================================
class MadridOpenDataClient(BaseAPIClient):
"""
Madrid Open Data API client with comprehensive traffic data support
Provides both real-time and historical traffic data from Madrid's open data portal.
Implements robust error handling, coordinate conversion, and synthetic data fallbacks.
"""
def __init__(self):
super().__init__(base_url="https://datos.madrid.es")
self.traffic_endpoints = [
"https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml"
]
self.measurement_points_url = "https://datos.madrid.es/egob/catalogo/202468-260-intensidad-trafico.csv"
self._conversion_log_count = [] # Track coordinate conversion logging
# Initialize coordinate converter
self.utm_proj = pyproj.Proj(proj='utm', zone=UTM_ZONE, ellps='WGS84', preserve_units=False)
# ================================================================
# PUBLIC API METHODS
# ================================================================
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""
Get current traffic data for location using working Madrid endpoints
Args:
latitude: Query location latitude
longitude: Query location longitude
Returns:
Dict with current traffic data or None if not available
"""
try:
logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude)
# Try real-time endpoints
for endpoint in self.traffic_endpoints:
try:
traffic_data = await self._fetch_traffic_xml_data(endpoint)
if traffic_data:
logger.info("Successfully fetched Madrid traffic data",
endpoint=endpoint, points=len(traffic_data))
# Find nearest measurement point
nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data)
if nearest_point:
parsed_data = self._parse_traffic_measurement(nearest_point)
logger.debug("Successfully parsed real Madrid traffic data",
point_name=nearest_point.get('descripcion'),
point_id=nearest_point.get('idelem'))
return parsed_data
else:
closest_distance = self._get_closest_distance(latitude, longitude, traffic_data)
logger.debug("No nearby traffic points found",
lat=latitude, lon=longitude,
closest_distance=closest_distance)
except Exception as e:
logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e))
continue
# Fallback to synthetic data
logger.info("No nearby Madrid traffic points found, using synthetic data")
return await self._generate_synthetic_traffic(latitude, longitude)
except Exception as e:
logger.error("Failed to get current traffic", error=str(e))
return await self._generate_synthetic_traffic(latitude, longitude)
async def get_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""
Get historical traffic data from Madrid Open Data ZIP files
Args:
latitude: Query location latitude
longitude: Query location longitude
start_date: Start date for historical data
end_date: End date for historical data
Returns:
List of historical traffic data dictionaries
"""
try:
logger.debug("Fetching Madrid historical traffic data",
lat=latitude, lon=longitude, start=start_date, end=end_date)
# Validate date range
if not self._validate_date_range(start_date, end_date):
return []
# Generate synthetic data as fallback
synthetic_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date)
logger.info("Generated synthetic historical traffic data", records=len(synthetic_data))
# Try to fetch real data
try:
real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date)
if real_data:
logger.info("Fetched real historical traffic data from ZIP files", records=len(real_data))
return real_data
else:
logger.info("No real historical data available, using synthetic data")
return synthetic_data
except Exception as e:
logger.warning("Failed to fetch real historical data, using synthetic", error=str(e))
return synthetic_data
except Exception as e:
logger.error("Error getting historical traffic data", error=str(e))
return []
async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic incidents and events (placeholder for future implementation)"""
return []
# ================================================================
# REAL-TIME TRAFFIC METHODS
# ================================================================
async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch and parse Madrid traffic XML data"""
try:
xml_content = await self._fetch_xml_content_robust(endpoint)
if not xml_content:
logger.debug("No XML content received", endpoint=endpoint)
return None
logger.debug("Madrid XML content preview",
length=len(xml_content),
first_500=xml_content[:500] if len(xml_content) > 500 else xml_content)
traffic_points = self._parse_madrid_traffic_xml(xml_content)
if traffic_points:
logger.debug("Successfully parsed Madrid traffic XML", points=len(traffic_points))
return traffic_points
else:
logger.warning("No traffic points found in XML", endpoint=endpoint)
return None
except Exception as e:
logger.error("Error fetching traffic XML data", endpoint=endpoint, error=str(e))
return None
def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]:
"""Parse Madrid traffic XML with correct structure"""
traffic_points = []
try:
cleaned_xml = self._clean_madrid_xml(xml_content)
root = ET.fromstring(cleaned_xml)
logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root)))
if root.tag == 'pms':
pm_elements = root.findall('pm')
logger.debug("Found PM elements", count=len(pm_elements))
for pm in pm_elements:
try:
traffic_point = self._extract_madrid_pm_element(pm)
if self._is_valid_traffic_point(traffic_point):
traffic_points.append(traffic_point)
# Log first few points for debugging
if len(traffic_points) <= 3:
logger.debug("Sample traffic point",
id=traffic_point['idelem'],
lat=traffic_point['latitude'],
lon=traffic_point['longitude'],
intensity=traffic_point.get('intensidad'))
except Exception as e:
logger.debug("Error parsing PM element", error=str(e))
continue
else:
logger.warning("Unexpected XML root tag", root_tag=root.tag)
logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points))
return traffic_points
except ET.ParseError as e:
logger.warning("Failed to parse Madrid XML", error=str(e))
return self._extract_traffic_data_regex(xml_content)
except Exception as e:
logger.error("Error in Madrid traffic XML parsing", error=str(e))
return []
# ================================================================
# HISTORICAL TRAFFIC METHODS
# ================================================================
async def _fetch_real_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch real historical traffic data from Madrid ZIP files"""
try:
historical_data = []
current_date = start_date.replace(day=1)
while current_date <= end_date:
try:
month_code = self._calculate_madrid_month_code(current_date.year, current_date.month)
if month_code:
zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip"
logger.debug("Trying ZIP URL", url=zip_url,
year=current_date.year, month=current_date.month, code=month_code)
zip_data = await self._fetch_historical_zip(zip_url)
if zip_data:
month_data = await self._parse_historical_zip(zip_data, latitude, longitude, start_date, end_date)
historical_data.extend(month_data)
logger.info("Fetched historical data for month",
year=current_date.year, month=current_date.month, records=len(month_data))
else:
logger.debug("No ZIP data found for month",
year=current_date.year, month=current_date.month)
else:
logger.debug("Could not calculate month code",
year=current_date.year, month=current_date.month)
current_date = self._get_next_month(current_date)
except Exception as e:
logger.warning("Error fetching data for month",
year=current_date.year, month=current_date.month, error=str(e))
current_date = self._get_next_month(current_date)
return historical_data
except Exception as e:
logger.error("Error fetching real historical traffic data", error=str(e))
return []
async def _parse_historical_zip(self, zip_content: bytes, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Parse Madrid historical traffic ZIP file"""
try:
import zipfile
from io import BytesIO
historical_records = []
with zipfile.ZipFile(BytesIO(zip_content), 'r') as zip_file:
logger.debug("ZIP file contents", files=zip_file.namelist())
csv_files = [f for f in zip_file.namelist()
if f.endswith('.csv') and not f.startswith('__MACOSX')]
if not csv_files:
logger.warning("No CSV files found in ZIP")
return []
for csv_filename in csv_files:
logger.debug("Processing CSV file", filename=csv_filename)
try:
csv_content = self._extract_csv_from_zip(zip_file, csv_filename)
if csv_content:
file_records = await self._parse_csv_content(
csv_content, latitude, longitude, start_date, end_date
)
historical_records.extend(file_records)
logger.debug("Processed CSV file",
filename=csv_filename, records=len(file_records))
except Exception as e:
logger.warning("Error processing CSV file",
filename=csv_filename, error=str(e))
continue
return historical_records
except Exception as e:
logger.error("Error parsing historical ZIP", error=str(e))
return []
# ================================================================
# DATA PARSING AND CONVERSION METHODS
# ================================================================
async def _parse_csv_content(self, csv_content: str, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Parse CSV content from Madrid historical traffic data"""
try:
import csv
from io import StringIO
csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
if not self._validate_csv_structure(csv_reader.fieldnames):
return []
logger.debug("Madrid CSV structure detected", fields=csv_reader.fieldnames)
# Get nearest measurement points
measurement_points = await self._get_measurement_points_near_location(latitude, longitude)
target_point_ids = [str(point.id) for point in measurement_points[:10]]
logger.debug("Target measurement points", ids=target_point_ids[:3])
# Process CSV rows
historical_records = []
processed_count = 0
for row_num, row in enumerate(csv_reader):
if processed_count >= MAX_CSV_PROCESSING_ROWS:
logger.info("Reached processing limit", limit=MAX_CSV_PROCESSING_ROWS)
break
try:
traffic_record = self._parse_csv_row(row, target_point_ids, start_date, end_date)
if traffic_record:
historical_records.append(traffic_record.to_dict())
processed_count += 1
if processed_count % 1000 == 0:
logger.debug("Processing progress", processed=processed_count)
except Exception as e:
if row_num % 5000 == 0:
logger.debug("Error parsing CSV row", row_num=row_num, error=str(e))
continue
logger.info("Successfully parsed Madrid CSV",
total_rows=row_num + 1, processed=processed_count, records=len(historical_records))
# Enrich with location data
if historical_records and measurement_points:
historical_records = self._enrich_with_location_data(historical_records, measurement_points)
return historical_records
except Exception as e:
logger.error("Error parsing Madrid CSV content", error=str(e))
return []
def _parse_csv_row(self, row: Dict[str, str], target_point_ids: List[str],
start_date: datetime, end_date: datetime) -> Optional[TrafficRecord]:
"""Parse a single CSV row into a TrafficRecord"""
try:
# Extract and validate point ID
point_id = str(row.get('id', '')).strip()
if not point_id or (target_point_ids and point_id not in target_point_ids):
return None
# Parse date
record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"'))
if not record_date:
return None
# ✅ CRITICAL FIX: Ensure both dates are timezone-aware for comparison
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
if record_date.tzinfo is None:
record_date = record_date.replace(tzinfo=timezone.utc)
# Now we can safely compare timezone-aware datetimes
if not (start_date <= record_date <= end_date):
return None
# Parse traffic data
intensidad = self._safe_int(row.get('intensidad', '0'))
ocupacion = self._safe_int(row.get('ocupacion', '0'))
carga = self._safe_int(row.get('carga', '0'))
vmed = self._safe_int(row.get('vmed', '0'))
tipo_elem = row.get('tipo_elem', '').strip().strip('"')
error = row.get('error', 'N').strip().strip('"')
# Skip erroneous records
if error == 'S':
return None
# Calculate derived metrics
avg_speed = self._calculate_average_speed(vmed, carga, ocupacion)
congestion_level = self._determine_congestion_level(carga, avg_speed)
pedestrian_count = self._calculate_pedestrian_count(tipo_elem, record_date.hour)
return TrafficRecord(
date=record_date,
traffic_volume=intensidad,
occupation_percentage=ocupacion,
load_percentage=carga,
average_speed=avg_speed,
congestion_level=congestion_level,
pedestrian_count=pedestrian_count,
measurement_point_id=point_id,
measurement_point_name=f"Madrid Point {point_id}",
road_type=tipo_elem,
source=DataSource.MADRID_HISTORICAL.value,
error_status=error,
intensidad_raw=intensidad,
ocupacion_raw=ocupacion,
carga_raw=carga,
vmed_raw=vmed
)
except Exception as e:
logger.debug("Error parsing CSV row", error=str(e))
return None
# ================================================================
# MEASUREMENT POINTS METHODS
# ================================================================
async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
"""Get measurement points near the specified location"""
try:
points_csv = await self._fetch_measurement_points_csv(self.measurement_points_url)
if points_csv:
return await self._parse_measurement_points_csv(points_csv, latitude, longitude)
else:
logger.info("Using fallback measurement points")
return self._get_fallback_measurement_points(latitude, longitude)
except Exception as e:
logger.warning("Error getting measurement points", error=str(e))
return self._get_fallback_measurement_points(latitude, longitude)
async def _parse_measurement_points_csv(self, csv_content: str, query_lat: float, query_lon: float) -> List[MeasurementPoint]:
"""Parse measurement points CSV and find nearest points"""
try:
import csv
from io import StringIO
points_with_distance = []
csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
for row in csv_reader:
try:
point_id = row.get('id', '').strip()
latitud = row.get('latitud', '').strip()
longitud = row.get('longitud', '').strip()
nombre = row.get('nombre', '').strip().strip('"')
tipo_elem = row.get('tipo_elem', '').strip().strip('"')
if not (point_id and latitud and longitud):
continue
lat, lon = float(latitud), float(longitud)
distance = self._calculate_distance(query_lat, query_lon, lat, lon)
point = MeasurementPoint(
id=point_id,
latitude=lat,
longitude=lon,
distance=distance,
name=nombre or f'Point {point_id}',
type=tipo_elem
)
points_with_distance.append(point)
except Exception as e:
logger.debug("Error parsing measurement point row", error=str(e))
continue
# Sort by distance and return closest points
points_with_distance.sort(key=lambda x: x.distance)
closest_points = points_with_distance[:MEASUREMENT_POINTS_LIMIT]
logger.info("Found measurement points",
total=len(points_with_distance), closest=len(closest_points))
return closest_points
except Exception as e:
logger.error("Error parsing measurement points CSV", error=str(e))
return []
# ================================================================
# COORDINATE CONVERSION METHODS
# ================================================================
def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]:
"""Extract traffic data from Madrid <pm> element with coordinate conversion"""
try:
point_data = {}
utm_x = utm_y = None
# Extract all child elements
for child in pm_element:
tag, text = child.tag, child.text.strip() if child.text else ''
if tag == 'idelem':
point_data['idelem'] = text
elif tag == 'descripcion':
point_data['descripcion'] = text
elif tag == 'intensidad':
point_data['intensidad'] = self._safe_int(text)
elif tag == 'ocupacion':
point_data['ocupacion'] = self._safe_float(text)
elif tag == 'carga':
point_data['carga'] = self._safe_int(text)
elif tag == 'nivelServicio':
point_data['nivelServicio'] = self._safe_int(text)
elif tag == 'st_x':
utm_x = text
point_data['utm_x'] = text
elif tag == 'st_y':
utm_y = text
point_data['utm_y'] = text
elif tag == 'error':
point_data['error'] = text
elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']:
point_data[tag] = text
# Convert coordinates
if utm_x and utm_y:
latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y)
if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude):
point_data.update({'latitude': latitude, 'longitude': longitude})
# Log successful conversions (limited)
self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude)
return point_data
else:
logger.debug("Invalid coordinates after conversion",
idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y)
return {}
else:
logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem'))
return {}
except Exception as e:
logger.debug("Error extracting Madrid PM element", error=str(e))
return {}
def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]:
"""Convert UTM coordinates to lat/lon using pyproj"""
try:
utm_x = float(utm_x_str.replace(',', '.'))
utm_y = float(utm_y_str.replace(',', '.'))
longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True)
return round(latitude, 6), round(longitude, 6)
except (ValueError, TypeError, Exception):
return None, None
# ================================================================
# UTILITY AND HELPER METHODS
# ================================================================
def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool:
"""Validate date range for historical data requests"""
days_diff = (end_date - start_date).days
# Allow same-day ranges (days_diff = 0) and ranges within the same day
if days_diff < 0:
logger.warning("End date before start date", start=start_date, end=end_date)
return False
if days_diff > MAX_HISTORICAL_DAYS:
logger.warning("Date range too large for historical traffic data", days=days_diff)
return False
return True
def _calculate_madrid_month_code(self, year: int, month: int) -> Optional[int]:
"""Calculate Madrid's month code for ZIP files (June 2025 = 145)"""
try:
reference_year, reference_month, reference_code = 2025, 6, 145
months_diff = (year - reference_year) * 12 + (month - reference_month)
estimated_code = reference_code + months_diff
if 100 <= estimated_code <= 300:
return estimated_code
else:
logger.warning("Month code out of range", year=year, month=month, code=estimated_code)
return None
except Exception as e:
logger.error("Error calculating month code", year=year, month=month, error=str(e))
return None
def _calculate_average_speed(self, vmed: int, carga: int, ocupacion: int) -> int:
"""Calculate average speed based on available data"""
if vmed > 0: # M30 points have speed data
return vmed
else: # Urban points - estimate from carga and ocupacion
if carga >= 80:
speed = 15
elif carga >= 50:
speed = 25
elif carga >= 20:
speed = 35
else:
speed = 45
# Adjust based on occupation
if ocupacion >= 30:
speed = max(10, speed - 10)
elif ocupacion <= 5:
speed = min(50, speed + 5)
return speed
def _determine_congestion_level(self, carga: int, avg_speed: int) -> str:
"""Determine congestion level from carga and speed"""
if carga >= 90 and avg_speed <= 10:
return CongestionLevel.BLOCKED.value
elif carga >= 75:
return CongestionLevel.HIGH.value
elif carga >= 40:
return CongestionLevel.MEDIUM.value
else:
return CongestionLevel.LOW.value
def _calculate_pedestrian_count(self, tipo_elem: str, hour: int) -> int:
"""Calculate pedestrian estimate based on area type and time"""
if tipo_elem == 'URB':
base = 200
if 12 <= hour <= 14: # Lunch time
multiplier = 2.0
elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
multiplier = 1.5
else:
multiplier = 1.0
else: # M30, C30
base = 50
multiplier = 0.5
return int(base * multiplier)
def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]:
"""Parse Madrid date format with timezone awareness"""
if not fecha_str:
return None
try:
# Parse the date as timezone-naive first
dt = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S')
# Convert to timezone-aware (assume Madrid/UTC timezone)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
try:
# Try alternative format
dt = datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S')
# Convert to timezone-aware (assume Madrid/UTC timezone)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
return None
def _validate_csv_structure(self, fieldnames: Optional[List[str]]) -> bool:
"""Validate CSV has expected structure"""
if not fieldnames:
logger.warning("No CSV fieldnames found")
return False
expected_fields = ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga']
missing_fields = [field for field in expected_fields if field not in fieldnames]
if missing_fields:
logger.warning("Missing expected fields in CSV", missing=missing_fields, available=fieldnames)
return True # Continue processing even with some missing fields
def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool:
"""Check if traffic point has valid essential data"""
return (traffic_point.get('latitude') and
traffic_point.get('longitude') and
traffic_point.get('idelem'))
def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool:
"""Validate coordinates are in Madrid area"""
return (MADRID_BOUNDS['lat_min'] <= latitude <= MADRID_BOUNDS['lat_max'] and
MADRID_BOUNDS['lon_min'] <= longitude <= MADRID_BOUNDS['lon_max'])
def _get_next_month(self, current_date: datetime) -> datetime:
"""Get next month date"""
if current_date.month == 12:
return current_date.replace(year=current_date.year + 1, month=1)
else:
return current_date.replace(month=current_date.month + 1)
def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str,
latitude: float, longitude: float) -> None:
"""Log coordinate conversion (limited to first few for debugging)"""
if len(self._conversion_log_count) < 3:
self._conversion_log_count.append(1)
logger.debug("Successful UTM conversion",
idelem=point_data.get('idelem'),
utm_x=utm_x, utm_y=utm_y,
latitude=latitude, longitude=longitude,
descripcion=point_data.get('descripcion'))
def _enrich_with_location_data(self, records: List[Dict[str, Any]],
measurement_points: List[MeasurementPoint]) -> List[Dict[str, Any]]:
"""Enrich traffic records with location data from measurement points"""
try:
points_lookup = {point.id: point for point in measurement_points}
for record in records:
point_id = record.get('measurement_point_id')
if point_id in points_lookup:
point = points_lookup[point_id]
record.update({
'measurement_point_name': point.name,
'measurement_point_latitude': point.latitude,
'measurement_point_longitude': point.longitude,
'distance_to_query': point.distance
})
return records
except Exception as e:
logger.warning("Error enriching with location data", error=str(e))
return records
# ================================================================
# HTTP CLIENT METHODS
# ================================================================
async def _fetch_xml_content_robust(self, url: str) -> Optional[str]:
"""Fetch XML content with robust headers for Madrid endpoints"""
try:
import httpx
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/xml,text/xml,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Referer': 'https://datos.madrid.es/'
}
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, headers=headers) as client:
logger.debug("Fetching XML from Madrid endpoint", url=url)
response = await client.get(url)
logger.debug("Madrid API response",
status=response.status_code,
content_type=response.headers.get('content-type'),
content_length=len(response.content))
if response.status_code == 200:
content = self._decode_response_content(response)
if content and len(content) > 100:
return content
return None
except Exception as e:
logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e))
return None
async def _fetch_historical_zip(self, url: str) -> Optional[bytes]:
"""Fetch historical ZIP data from Madrid Open Data"""
try:
import httpx
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
'Accept': 'application/zip,application/octet-stream,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
}
async with httpx.AsyncClient(timeout=120.0, headers=headers, follow_redirects=True) as client:
logger.debug("Fetching historical ZIP", url=url)
response = await client.get(url)
if response.status_code == 200:
content = response.content
if content and len(content) > 1000:
logger.debug("Successfully fetched ZIP", url=url, size=len(content))
return content
else:
logger.debug("ZIP file too small", url=url, size=len(content) if content else 0)
else:
logger.debug("ZIP not found", url=url, status=response.status_code)
except Exception as e:
logger.debug("Error fetching ZIP", url=url, error=str(e))
return None
async def _fetch_measurement_points_csv(self, url: str) -> Optional[str]:
"""Fetch the measurement points CSV file"""
try:
import httpx
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
'Accept': 'text/csv,application/csv,text/plain,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
}
async with httpx.AsyncClient(timeout=30.0, headers=headers, follow_redirects=True) as client:
logger.debug("Fetching measurement points CSV", url=url)
response = await client.get(url)
if response.status_code == 200:
content = response.text
if content and len(content) > 1000:
logger.debug("Successfully fetched measurement points CSV",
url=url, size=len(content))
return content
else:
logger.debug("Measurement points CSV too small", size=len(content))
else:
logger.debug("Measurement points CSV not found",
url=url, status=response.status_code)
except Exception as e:
logger.debug("Error fetching measurement points CSV", url=url, error=str(e))
return None
def _decode_response_content(self, response) -> Optional[str]:
"""Decode response content with multiple encoding attempts"""
try:
return response.text
except UnicodeDecodeError:
# Try manual encoding for Spanish content
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
try:
content = response.content.decode(encoding)
if content and len(content) > 100:
logger.debug("Successfully decoded with encoding", encoding=encoding)
return content
except UnicodeDecodeError:
continue
return None
def _extract_csv_from_zip(self, zip_file, csv_filename: str) -> Optional[str]:
"""Extract and decode CSV content from ZIP file"""
try:
csv_bytes = zip_file.read(csv_filename)
# Try different encodings for Spanish content
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
try:
csv_content = csv_bytes.decode(encoding)
logger.debug("Successfully decoded CSV", filename=csv_filename, encoding=encoding)
return csv_content
except UnicodeDecodeError:
continue
logger.warning("Could not decode CSV file", filename=csv_filename)
return None
except Exception as e:
logger.warning("Error extracting CSV from ZIP", filename=csv_filename, error=str(e))
return None
# ================================================================
# XML PROCESSING METHODS
# ================================================================
def _clean_madrid_xml(self, xml_content: str) -> str:
"""Clean Madrid XML to handle undefined entities and encoding issues"""
try:
# Remove BOM if present
xml_content = xml_content.lstrip('\ufeff')
# Replace undefined entities
entity_replacements = {
'&nbsp;': ' ', '&copy;': '©', '&reg;': '®', '&trade;': ''
}
for entity, replacement in entity_replacements.items():
xml_content = xml_content.replace(entity, replacement)
# Fix unescaped ampersands
xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&amp;', xml_content)
# Remove invalid control characters
xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content)
# Handle Spanish characters
spanish_chars = {
'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U'
}
for spanish_char, replacement in spanish_chars.items():
xml_content = xml_content.replace(spanish_char, replacement)
return xml_content
except Exception as e:
logger.warning("Error cleaning Madrid XML", error=str(e))
return xml_content
def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]:
"""Extract traffic data using regex when XML parsing fails"""
traffic_points = []
try:
pm_pattern = r'<pm>(.*?)</pm>'
pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL)
for pm_content in pm_matches:
try:
extracted_data = self._extract_pm_data_regex(pm_content)
if extracted_data and self._is_valid_traffic_point(extracted_data):
traffic_points.append(extracted_data)
except Exception as e:
logger.debug("Error parsing regex PM match", error=str(e))
continue
logger.debug("Regex extraction results", count=len(traffic_points))
return traffic_points
except Exception as e:
logger.error("Error in regex extraction", error=str(e))
return []
def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]:
"""Extract individual PM data using regex"""
patterns = {
'idelem': r'<idelem>(.*?)</idelem>',
'intensidad': r'<intensidad>(.*?)</intensidad>',
'st_x': r'<st_x>(.*?)</st_x>',
'st_y': r'<st_y>(.*?)</st_y>',
'descripcion': r'<descripcion>(.*?)</descripcion>'
}
extracted = {}
for field, pattern in patterns.items():
match = re.search(pattern, pm_content)
extracted[field] = match.group(1) if match else ''
if extracted['idelem'] and extracted['st_x'] and extracted['st_y']:
# Convert coordinates
latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y'])
if latitude and longitude:
return {
'idelem': extracted['idelem'],
'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}",
'intensidad': self._safe_int(extracted['intensidad']),
'latitude': latitude,
'longitude': longitude,
'ocupacion': 0,
'carga': 0,
'nivelServicio': 0,
'error': 'N'
}
return {}
# ================================================================
# TRAFFIC ANALYSIS METHODS
# ================================================================
def _find_nearest_traffic_point(self, latitude: float, longitude: float,
traffic_data: List[Dict]) -> Optional[Dict]:
"""Find the nearest traffic measurement point to given coordinates"""
if not traffic_data:
return None
min_distance = float('inf')
nearest_point = None
for point in traffic_data:
if point.get('latitude') and point.get('longitude'):
distance = self._calculate_distance(
latitude, longitude,
point['latitude'], point['longitude']
)
if distance < min_distance:
min_distance = distance
nearest_point = point
# Madrid area search radius (15km)
if nearest_point and min_distance <= 15.0:
logger.debug("Found nearest Madrid traffic point",
distance_km=min_distance,
point_name=nearest_point.get('descripcion'),
point_id=nearest_point.get('idelem'))
return nearest_point
logger.debug("No nearby Madrid traffic points found",
min_distance=min_distance, total_points=len(traffic_data))
return None
def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float:
"""Get distance to closest traffic point for debugging"""
if not traffic_data:
return float('inf')
min_distance = float('inf')
for point in traffic_data:
if point.get('latitude') and point.get('longitude'):
distance = self._calculate_distance(
latitude, longitude,
point['latitude'], point['longitude']
)
min_distance = min(min_distance, distance)
return min_distance
def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]:
"""Parse Madrid traffic measurement into standardized format"""
try:
service_level = traffic_point.get('nivelServicio', 0)
congestion_mapping = {
TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value,
TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value,
TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value,
TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value
}
# Speed estimation based on service level
speed_mapping = {
TrafficServiceLevel.FLUID.value: 45,
TrafficServiceLevel.DENSE.value: 25,
TrafficServiceLevel.CONGESTED.value: 15,
TrafficServiceLevel.BLOCKED.value: 5
}
congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value)
average_speed = speed_mapping.get(service_level, 25)
# Calculate pedestrian estimate
hour = datetime.now().hour
pedestrian_multiplier = self._get_pedestrian_multiplier(hour)
pedestrian_count = int(100 * pedestrian_multiplier)
return {
"date": datetime.now(),
"traffic_volume": traffic_point.get('intensidad', 0),
"pedestrian_count": pedestrian_count,
"congestion_level": congestion_level,
"average_speed": average_speed,
"occupation_percentage": traffic_point.get('ocupacion', 0),
"load_percentage": traffic_point.get('carga', 0),
"measurement_point_id": traffic_point.get('idelem'),
"measurement_point_name": traffic_point.get('descripcion'),
"road_type": "URB",
"source": DataSource.MADRID_REALTIME.value
}
except Exception as e:
logger.error("Error parsing traffic measurement", error=str(e))
return self._get_default_traffic_data()
def _get_pedestrian_multiplier(self, hour: int) -> float:
"""Get pedestrian multiplier based on time of day"""
if 13 <= hour <= 15: # Lunch time
return 2.5
elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
return 2.0
else:
return 1.0
# ================================================================
# SYNTHETIC DATA GENERATION METHODS
# ================================================================
async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]:
"""Generate realistic Madrid traffic data as fallback"""
now = datetime.now()
hour = now.hour
is_weekend = now.weekday() >= 5
traffic_params = self._calculate_traffic_parameters(hour, is_weekend)
return {
"date": now,
"traffic_volume": traffic_params['volume'],
"pedestrian_count": traffic_params['pedestrians'],
"congestion_level": traffic_params['congestion'],
"average_speed": traffic_params['speed'],
"occupation_percentage": min(100, traffic_params['volume'] // 2),
"load_percentage": min(100, traffic_params['volume'] // 3),
"measurement_point_id": "madrid_synthetic",
"measurement_point_name": "Madrid Centro (Synthetic)",
"road_type": "URB",
"source": DataSource.SYNTHETIC.value
}
async def _generate_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Generate synthetic historical traffic data with realistic patterns"""
try:
import random
from datetime import timedelta
historical_data = []
current_date = start_date
# Seed random for consistent data
random.seed(hash(f"{latitude}{longitude}"))
while current_date < end_date:
# Calculate how many hours to generate for this day
if current_date.date() == end_date.date():
# Same day as end_date, only generate up to end_date hour
end_hour = end_date.hour
else:
# Full day
end_hour = 24
# Generate hourly records for this day
for hour in range(current_date.hour if current_date == start_date else 0, end_hour):
record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0)
# Skip if record time is at or beyond end_date
if record_time >= end_date:
break
traffic_params = self._generate_synthetic_traffic_params(record_time, random)
traffic_record = {
"date": record_time,
"traffic_volume": traffic_params['volume'],
"pedestrian_count": traffic_params['pedestrians'],
"congestion_level": traffic_params['congestion'],
"average_speed": traffic_params['speed'],
"occupation_percentage": min(100, traffic_params['volume'] // 2),
"load_percentage": min(100, traffic_params['volume'] // 3),
"measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}",
"measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})",
"road_type": "URB",
"source": DataSource.SYNTHETIC_HISTORICAL.value
}
historical_data.append(traffic_record)
# Move to next day
if current_date.date() == end_date.date():
# We've processed the end date, stop
break
else:
# Move to start of next day
current_date = (current_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
logger.info("Generated historical traffic data",
records=len(historical_data), start=start_date, end=end_date)
return historical_data
except Exception as e:
logger.error("Error generating historical traffic data", error=str(e))
return []
def _calculate_traffic_parameters(self, hour: int, is_weekend: bool) -> Dict[str, Any]:
"""Calculate traffic parameters based on time and day type"""
base_traffic = 100
if not is_weekend:
if 7 <= hour <= 9:
multiplier, congestion, speed = 2.2, "high", 15
elif 18 <= hour <= 20:
multiplier, congestion, speed = 2.5, "high", 12
elif 12 <= hour <= 14:
multiplier, congestion, speed = 1.6, "medium", 25
else:
multiplier, congestion, speed = 1.0, "low", 40
else:
if 11 <= hour <= 14:
multiplier, congestion, speed = 1.4, "medium", 30
else:
multiplier, congestion, speed = 0.8, "low", 45
volume = int(base_traffic * multiplier)
pedestrians = int(150 * self._get_pedestrian_multiplier(hour))
return {
'volume': volume,
'congestion': congestion,
'speed': max(10, speed),
'pedestrians': pedestrians
}
def _generate_synthetic_traffic_params(self, record_time: datetime, random_gen) -> Dict[str, Any]:
"""Generate synthetic traffic parameters with random variations"""
hour = record_time.hour
day_of_week = record_time.weekday()
month = record_time.month
base_params = self._calculate_traffic_parameters(hour, day_of_week >= 5)
# Add random variations
volume_variation = random_gen.uniform(-0.3, 0.3)
speed_variation = random_gen.randint(-5, 5)
# Apply seasonal adjustments
seasonal_multiplier = 0.8 if month in [7, 8] else (1.1 if month in [11, 12] else 1.0)
# Weekend specific adjustments
if day_of_week >= 5 and hour in [11, 12, 13, 14, 15]:
base_params['volume'] = int(base_params['volume'] * 1.4)
base_params['congestion'] = "medium"
return {
'volume': max(10, int(base_params['volume'] * (1 + volume_variation) * seasonal_multiplier)),
'congestion': base_params['congestion'],
'speed': max(10, min(60, base_params['speed'] + speed_variation)),
'pedestrians': int(base_params['pedestrians'] * random_gen.uniform(0.8, 1.2))
}
def _get_fallback_measurement_points(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
"""Generate fallback measurement points when CSV is not available"""
madrid_points = [
(40.4168, -3.7038, "Madrid Centro"),
(40.4200, -3.7060, "Gran Vía"),
(40.4155, -3.7074, "Plaza Mayor"),
(40.4152, -3.6844, "Retiro"),
(40.4063, -3.6932, "Atocha"),
]
fallback_points = []
for i, (lat, lon, name) in enumerate(madrid_points):
distance = self._calculate_distance(latitude, longitude, lat, lon)
point = MeasurementPoint(
id=f'fallback_{i+1000}',
latitude=lat,
longitude=lon,
distance=distance,
name=name,
type='URB'
)
fallback_points.append(point)
fallback_points.sort(key=lambda x: x.distance)
return fallback_points[:5]
def _get_default_traffic_data(self) -> Dict[str, Any]:
"""Get default traffic data when parsing fails"""
return {
"date": datetime.now(),
"traffic_volume": 100,
"pedestrian_count": 150,
"congestion_level": CongestionLevel.MEDIUM.value,
"average_speed": 25,
"occupation_percentage": 30,
"load_percentage": 40,
"measurement_point_id": "unknown",
"measurement_point_name": "Unknown location",
"road_type": "URB",
"source": DataSource.SYNTHETIC.value
}
# ================================================================
# CORE UTILITY METHODS
# ================================================================
def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates using Haversine formula"""
R = 6371 # Earth's radius in km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
def _safe_int(self, value_str: str) -> int:
"""Safely convert string to int"""
try:
return int(float(value_str.replace(',', '.')))
except (ValueError, TypeError):
return 0
def _safe_float(self, value_str: str) -> float:
"""Safely convert string to float"""
try:
return float(value_str.replace(',', '.'))
except (ValueError, TypeError):
return 0.0