# ================================================================ # services/data/app/external/madrid_opendata.py - REFACTORED # ================================================================ """ Madrid Open Data API client with clean architecture and best practices Features: - Real-time traffic data from XML endpoints - Historical traffic data from ZIP files - Measurement points integration - Robust error handling and fallbacks - Comprehensive logging """ import math import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timedelta, timezone import structlog import re from dataclasses import dataclass from enum import Enum from app.external.base_client import BaseAPIClient from app.core.config import settings import pyproj logger = structlog.get_logger() # ================================================================ # CONSTANTS AND ENUMS # ================================================================ class TrafficServiceLevel(Enum): """Madrid traffic service levels""" FLUID = 0 DENSE = 1 CONGESTED = 2 BLOCKED = 3 class CongestionLevel(Enum): """Standardized congestion levels""" LOW = "low" MEDIUM = "medium" HIGH = "high" BLOCKED = "blocked" class DataSource(Enum): """Data source types""" MADRID_REALTIME = "madrid_opendata" MADRID_HISTORICAL = "madrid_opendata_zip" SYNTHETIC = "synthetic" SYNTHETIC_HISTORICAL = "synthetic_historical" # Madrid geographic bounds MADRID_BOUNDS = { 'lat_min': 40.31, 'lat_max': 40.56, 'lon_min': -3.89, 'lon_max': -3.51 } # Constants MAX_HISTORICAL_DAYS = 1095 # 3 years - allow longer training periods MAX_CSV_PROCESSING_ROWS = 5000000 MEASUREMENT_POINTS_LIMIT = 20 UTM_ZONE = 30 # Madrid is in UTM Zone 30N @dataclass class MeasurementPoint: """Measurement point data structure""" id: str latitude: float longitude: float distance: float name: str type: str @dataclass class TrafficRecord: """Traffic record data structure""" date: datetime traffic_volume: int occupation_percentage: int load_percentage: int average_speed: int congestion_level: str pedestrian_count: int measurement_point_id: str measurement_point_name: str road_type: str source: str error_status: Optional[str] = None # Madrid-specific raw data intensidad_raw: Optional[int] = None ocupacion_raw: Optional[int] = None carga_raw: Optional[int] = None vmed_raw: Optional[int] = None def to_dict(self) -> Dict[str, Any]: """Convert TrafficRecord to dictionary""" result = { "date": self.date, "traffic_volume": self.traffic_volume, "occupation_percentage": self.occupation_percentage, "load_percentage": self.load_percentage, "average_speed": self.average_speed, "congestion_level": self.congestion_level, "pedestrian_count": self.pedestrian_count, "measurement_point_id": self.measurement_point_id, "measurement_point_name": self.measurement_point_name, "road_type": self.road_type, "source": self.source } # Add optional fields if present optional_fields = ['error_status', 'intensidad_raw', 'ocupacion_raw', 'carga_raw', 'vmed_raw'] for field in optional_fields: value = getattr(self, field, None) if value is not None: result[field] = value return result # ================================================================ # MADRID OPEN DATA CLIENT # ================================================================ class MadridOpenDataClient(BaseAPIClient): """ Madrid Open Data API client with comprehensive traffic data support Provides both real-time and historical traffic data from Madrid's open data portal. Implements robust error handling, coordinate conversion, and synthetic data fallbacks. """ def __init__(self): super().__init__(base_url="https://datos.madrid.es") self.traffic_endpoints = [ "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml" ] self.measurement_points_url = "https://datos.madrid.es/egob/catalogo/202468-260-intensidad-trafico.csv" self._conversion_log_count = [] # Track coordinate conversion logging # Initialize coordinate converter self.utm_proj = pyproj.Proj(proj='utm', zone=UTM_ZONE, ellps='WGS84', preserve_units=False) # ================================================================ # PUBLIC API METHODS # ================================================================ async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: """ Get current traffic data for location using working Madrid endpoints Args: latitude: Query location latitude longitude: Query location longitude Returns: Dict with current traffic data or None if not available """ try: logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude) # Try real-time endpoints for endpoint in self.traffic_endpoints: try: traffic_data = await self._fetch_traffic_xml_data(endpoint) if traffic_data: logger.info("Successfully fetched Madrid traffic data", endpoint=endpoint, points=len(traffic_data)) # Find nearest measurement point nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data) if nearest_point: parsed_data = self._parse_traffic_measurement(nearest_point) logger.debug("Successfully parsed real Madrid traffic data", point_name=nearest_point.get('descripcion'), point_id=nearest_point.get('idelem')) return parsed_data else: closest_distance = self._get_closest_distance(latitude, longitude, traffic_data) logger.debug("No nearby traffic points found", lat=latitude, lon=longitude, closest_distance=closest_distance) except Exception as e: logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e)) continue # Fallback to synthetic data logger.info("No nearby Madrid traffic points found, using synthetic data") return await self._generate_synthetic_traffic(latitude, longitude) except Exception as e: logger.error("Failed to get current traffic", error=str(e)) return await self._generate_synthetic_traffic(latitude, longitude) async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """ Get historical traffic data from Madrid Open Data ZIP files Args: latitude: Query location latitude longitude: Query location longitude start_date: Start date for historical data end_date: End date for historical data Returns: List of historical traffic data dictionaries """ try: logger.debug("Fetching Madrid historical traffic data", lat=latitude, lon=longitude, start=start_date, end=end_date) # Validate date range if not self._validate_date_range(start_date, end_date): return [] # Generate synthetic data as fallback synthetic_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date) logger.info("Generated synthetic historical traffic data", records=len(synthetic_data)) # Try to fetch real data try: real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date) if real_data: logger.info("Fetched real historical traffic data from ZIP files", records=len(real_data)) return real_data else: logger.info("No real historical data available, using synthetic data") return synthetic_data except Exception as e: logger.warning("Failed to fetch real historical data, using synthetic", error=str(e)) return synthetic_data except Exception as e: logger.error("Error getting historical traffic data", error=str(e)) return [] async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: """Get traffic incidents and events (placeholder for future implementation)""" return [] # ================================================================ # REAL-TIME TRAFFIC METHODS # ================================================================ async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]: """Fetch and parse Madrid traffic XML data""" try: xml_content = await self._fetch_xml_content_robust(endpoint) if not xml_content: logger.debug("No XML content received", endpoint=endpoint) return None logger.debug("Madrid XML content preview", length=len(xml_content), first_500=xml_content[:500] if len(xml_content) > 500 else xml_content) traffic_points = self._parse_madrid_traffic_xml(xml_content) if traffic_points: logger.debug("Successfully parsed Madrid traffic XML", points=len(traffic_points)) return traffic_points else: logger.warning("No traffic points found in XML", endpoint=endpoint) return None except Exception as e: logger.error("Error fetching traffic XML data", endpoint=endpoint, error=str(e)) return None def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]: """Parse Madrid traffic XML with correct structure""" traffic_points = [] try: cleaned_xml = self._clean_madrid_xml(xml_content) root = ET.fromstring(cleaned_xml) logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root))) if root.tag == 'pms': pm_elements = root.findall('pm') logger.debug("Found PM elements", count=len(pm_elements)) for pm in pm_elements: try: traffic_point = self._extract_madrid_pm_element(pm) if self._is_valid_traffic_point(traffic_point): traffic_points.append(traffic_point) # Log first few points for debugging if len(traffic_points) <= 3: logger.debug("Sample traffic point", id=traffic_point['idelem'], lat=traffic_point['latitude'], lon=traffic_point['longitude'], intensity=traffic_point.get('intensidad')) except Exception as e: logger.debug("Error parsing PM element", error=str(e)) continue else: logger.warning("Unexpected XML root tag", root_tag=root.tag) logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points)) return traffic_points except ET.ParseError as e: logger.warning("Failed to parse Madrid XML", error=str(e)) return self._extract_traffic_data_regex(xml_content) except Exception as e: logger.error("Error in Madrid traffic XML parsing", error=str(e)) return [] # ================================================================ # HISTORICAL TRAFFIC METHODS # ================================================================ async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Fetch real historical traffic data from Madrid ZIP files""" try: historical_data = [] current_date = start_date.replace(day=1) while current_date <= end_date: try: month_code = self._calculate_madrid_month_code(current_date.year, current_date.month) if month_code: zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip" logger.debug("Trying ZIP URL", url=zip_url, year=current_date.year, month=current_date.month, code=month_code) zip_data = await self._fetch_historical_zip(zip_url) if zip_data: month_data = await self._parse_historical_zip(zip_data, latitude, longitude, start_date, end_date) historical_data.extend(month_data) logger.info("Fetched historical data for month", year=current_date.year, month=current_date.month, records=len(month_data)) else: logger.debug("No ZIP data found for month", year=current_date.year, month=current_date.month) else: logger.debug("Could not calculate month code", year=current_date.year, month=current_date.month) current_date = self._get_next_month(current_date) except Exception as e: logger.warning("Error fetching data for month", year=current_date.year, month=current_date.month, error=str(e)) current_date = self._get_next_month(current_date) return historical_data except Exception as e: logger.error("Error fetching real historical traffic data", error=str(e)) return [] async def _parse_historical_zip(self, zip_content: bytes, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Parse Madrid historical traffic ZIP file""" try: import zipfile from io import BytesIO historical_records = [] with zipfile.ZipFile(BytesIO(zip_content), 'r') as zip_file: logger.debug("ZIP file contents", files=zip_file.namelist()) csv_files = [f for f in zip_file.namelist() if f.endswith('.csv') and not f.startswith('__MACOSX')] if not csv_files: logger.warning("No CSV files found in ZIP") return [] for csv_filename in csv_files: logger.debug("Processing CSV file", filename=csv_filename) try: csv_content = self._extract_csv_from_zip(zip_file, csv_filename) if csv_content: file_records = await self._parse_csv_content( csv_content, latitude, longitude, start_date, end_date ) historical_records.extend(file_records) logger.debug("Processed CSV file", filename=csv_filename, records=len(file_records)) except Exception as e: logger.warning("Error processing CSV file", filename=csv_filename, error=str(e)) continue return historical_records except Exception as e: logger.error("Error parsing historical ZIP", error=str(e)) return [] # ================================================================ # DATA PARSING AND CONVERSION METHODS # ================================================================ async def _parse_csv_content(self, csv_content: str, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Parse CSV content from Madrid historical traffic data""" try: import csv from io import StringIO csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') if not self._validate_csv_structure(csv_reader.fieldnames): return [] logger.debug("Madrid CSV structure detected", fields=csv_reader.fieldnames) # Get nearest measurement points measurement_points = await self._get_measurement_points_near_location(latitude, longitude) target_point_ids = [str(point.id) for point in measurement_points[:10]] logger.debug("Target measurement points", ids=target_point_ids[:3]) # Process CSV rows historical_records = [] processed_count = 0 for row_num, row in enumerate(csv_reader): if processed_count >= MAX_CSV_PROCESSING_ROWS: logger.info("Reached processing limit", limit=MAX_CSV_PROCESSING_ROWS) break try: traffic_record = self._parse_csv_row(row, target_point_ids, start_date, end_date) if traffic_record: historical_records.append(traffic_record.to_dict()) processed_count += 1 if processed_count % 1000 == 0: logger.debug("Processing progress", processed=processed_count) except Exception as e: if row_num % 5000 == 0: logger.debug("Error parsing CSV row", row_num=row_num, error=str(e)) continue logger.info("Successfully parsed Madrid CSV", total_rows=row_num + 1, processed=processed_count, records=len(historical_records)) # Enrich with location data if historical_records and measurement_points: historical_records = self._enrich_with_location_data(historical_records, measurement_points) return historical_records except Exception as e: logger.error("Error parsing Madrid CSV content", error=str(e)) return [] def _parse_csv_row(self, row: Dict[str, str], target_point_ids: List[str], start_date: datetime, end_date: datetime) -> Optional[TrafficRecord]: """Parse a single CSV row into a TrafficRecord""" try: # Extract and validate point ID point_id = str(row.get('id', '')).strip() if not point_id or (target_point_ids and point_id not in target_point_ids): return None # Parse date record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"')) if not record_date: return None # ✅ CRITICAL FIX: Ensure both dates are timezone-aware for comparison if start_date.tzinfo is None: start_date = start_date.replace(tzinfo=timezone.utc) if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) if record_date.tzinfo is None: record_date = record_date.replace(tzinfo=timezone.utc) # Now we can safely compare timezone-aware datetimes if not (start_date <= record_date <= end_date): return None # Parse traffic data intensidad = self._safe_int(row.get('intensidad', '0')) ocupacion = self._safe_int(row.get('ocupacion', '0')) carga = self._safe_int(row.get('carga', '0')) vmed = self._safe_int(row.get('vmed', '0')) tipo_elem = row.get('tipo_elem', '').strip().strip('"') error = row.get('error', 'N').strip().strip('"') # Skip erroneous records if error == 'S': return None # Calculate derived metrics avg_speed = self._calculate_average_speed(vmed, carga, ocupacion) congestion_level = self._determine_congestion_level(carga, avg_speed) pedestrian_count = self._calculate_pedestrian_count(tipo_elem, record_date.hour) return TrafficRecord( date=record_date, traffic_volume=intensidad, occupation_percentage=ocupacion, load_percentage=carga, average_speed=avg_speed, congestion_level=congestion_level, pedestrian_count=pedestrian_count, measurement_point_id=point_id, measurement_point_name=f"Madrid Point {point_id}", road_type=tipo_elem, source=DataSource.MADRID_HISTORICAL.value, error_status=error, intensidad_raw=intensidad, ocupacion_raw=ocupacion, carga_raw=carga, vmed_raw=vmed ) except Exception as e: logger.debug("Error parsing CSV row", error=str(e)) return None # ================================================================ # MEASUREMENT POINTS METHODS # ================================================================ async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[MeasurementPoint]: """Get measurement points near the specified location""" try: points_csv = await self._fetch_measurement_points_csv(self.measurement_points_url) if points_csv: return await self._parse_measurement_points_csv(points_csv, latitude, longitude) else: logger.info("Using fallback measurement points") return self._get_fallback_measurement_points(latitude, longitude) except Exception as e: logger.warning("Error getting measurement points", error=str(e)) return self._get_fallback_measurement_points(latitude, longitude) async def _parse_measurement_points_csv(self, csv_content: str, query_lat: float, query_lon: float) -> List[MeasurementPoint]: """Parse measurement points CSV and find nearest points""" try: import csv from io import StringIO points_with_distance = [] csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') for row in csv_reader: try: point_id = row.get('id', '').strip() latitud = row.get('latitud', '').strip() longitud = row.get('longitud', '').strip() nombre = row.get('nombre', '').strip().strip('"') tipo_elem = row.get('tipo_elem', '').strip().strip('"') if not (point_id and latitud and longitud): continue lat, lon = float(latitud), float(longitud) distance = self._calculate_distance(query_lat, query_lon, lat, lon) point = MeasurementPoint( id=point_id, latitude=lat, longitude=lon, distance=distance, name=nombre or f'Point {point_id}', type=tipo_elem ) points_with_distance.append(point) except Exception as e: logger.debug("Error parsing measurement point row", error=str(e)) continue # Sort by distance and return closest points points_with_distance.sort(key=lambda x: x.distance) closest_points = points_with_distance[:MEASUREMENT_POINTS_LIMIT] logger.info("Found measurement points", total=len(points_with_distance), closest=len(closest_points)) return closest_points except Exception as e: logger.error("Error parsing measurement points CSV", error=str(e)) return [] # ================================================================ # COORDINATE CONVERSION METHODS # ================================================================ def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]: """Extract traffic data from Madrid element with coordinate conversion""" try: point_data = {} utm_x = utm_y = None # Extract all child elements for child in pm_element: tag, text = child.tag, child.text.strip() if child.text else '' if tag == 'idelem': point_data['idelem'] = text elif tag == 'descripcion': point_data['descripcion'] = text elif tag == 'intensidad': point_data['intensidad'] = self._safe_int(text) elif tag == 'ocupacion': point_data['ocupacion'] = self._safe_float(text) elif tag == 'carga': point_data['carga'] = self._safe_int(text) elif tag == 'nivelServicio': point_data['nivelServicio'] = self._safe_int(text) elif tag == 'st_x': utm_x = text point_data['utm_x'] = text elif tag == 'st_y': utm_y = text point_data['utm_y'] = text elif tag == 'error': point_data['error'] = text elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']: point_data[tag] = text # Convert coordinates if utm_x and utm_y: latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y) if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude): point_data.update({'latitude': latitude, 'longitude': longitude}) # Log successful conversions (limited) self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude) return point_data else: logger.debug("Invalid coordinates after conversion", idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y) return {} else: logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem')) return {} except Exception as e: logger.debug("Error extracting Madrid PM element", error=str(e)) return {} def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]: """Convert UTM coordinates to lat/lon using pyproj""" try: utm_x = float(utm_x_str.replace(',', '.')) utm_y = float(utm_y_str.replace(',', '.')) longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True) return round(latitude, 6), round(longitude, 6) except (ValueError, TypeError, Exception): return None, None # ================================================================ # UTILITY AND HELPER METHODS # ================================================================ def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool: """Validate date range for historical data requests""" days_diff = (end_date - start_date).days # Allow same-day ranges (days_diff = 0) and ranges within the same day if days_diff < 0: logger.warning("End date before start date", start=start_date, end=end_date) return False if days_diff > MAX_HISTORICAL_DAYS: logger.warning("Date range too large for historical traffic data", days=days_diff) return False return True def _calculate_madrid_month_code(self, year: int, month: int) -> Optional[int]: """Calculate Madrid's month code for ZIP files (June 2025 = 145)""" try: reference_year, reference_month, reference_code = 2025, 6, 145 months_diff = (year - reference_year) * 12 + (month - reference_month) estimated_code = reference_code + months_diff if 100 <= estimated_code <= 300: return estimated_code else: logger.warning("Month code out of range", year=year, month=month, code=estimated_code) return None except Exception as e: logger.error("Error calculating month code", year=year, month=month, error=str(e)) return None def _calculate_average_speed(self, vmed: int, carga: int, ocupacion: int) -> int: """Calculate average speed based on available data""" if vmed > 0: # M30 points have speed data return vmed else: # Urban points - estimate from carga and ocupacion if carga >= 80: speed = 15 elif carga >= 50: speed = 25 elif carga >= 20: speed = 35 else: speed = 45 # Adjust based on occupation if ocupacion >= 30: speed = max(10, speed - 10) elif ocupacion <= 5: speed = min(50, speed + 5) return speed def _determine_congestion_level(self, carga: int, avg_speed: int) -> str: """Determine congestion level from carga and speed""" if carga >= 90 and avg_speed <= 10: return CongestionLevel.BLOCKED.value elif carga >= 75: return CongestionLevel.HIGH.value elif carga >= 40: return CongestionLevel.MEDIUM.value else: return CongestionLevel.LOW.value def _calculate_pedestrian_count(self, tipo_elem: str, hour: int) -> int: """Calculate pedestrian estimate based on area type and time""" if tipo_elem == 'URB': base = 200 if 12 <= hour <= 14: # Lunch time multiplier = 2.0 elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours multiplier = 1.5 else: multiplier = 1.0 else: # M30, C30 base = 50 multiplier = 0.5 return int(base * multiplier) def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]: """Parse Madrid date format with timezone awareness""" if not fecha_str: return None try: # Parse the date as timezone-naive first dt = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') # Convert to timezone-aware (assume Madrid/UTC timezone) return dt.replace(tzinfo=timezone.utc) except ValueError: try: # Try alternative format dt = datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S') # Convert to timezone-aware (assume Madrid/UTC timezone) return dt.replace(tzinfo=timezone.utc) except ValueError: return None def _validate_csv_structure(self, fieldnames: Optional[List[str]]) -> bool: """Validate CSV has expected structure""" if not fieldnames: logger.warning("No CSV fieldnames found") return False expected_fields = ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga'] missing_fields = [field for field in expected_fields if field not in fieldnames] if missing_fields: logger.warning("Missing expected fields in CSV", missing=missing_fields, available=fieldnames) return True # Continue processing even with some missing fields def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool: """Check if traffic point has valid essential data""" return (traffic_point.get('latitude') and traffic_point.get('longitude') and traffic_point.get('idelem')) def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool: """Validate coordinates are in Madrid area""" return (MADRID_BOUNDS['lat_min'] <= latitude <= MADRID_BOUNDS['lat_max'] and MADRID_BOUNDS['lon_min'] <= longitude <= MADRID_BOUNDS['lon_max']) def _get_next_month(self, current_date: datetime) -> datetime: """Get next month date""" if current_date.month == 12: return current_date.replace(year=current_date.year + 1, month=1) else: return current_date.replace(month=current_date.month + 1) def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str, latitude: float, longitude: float) -> None: """Log coordinate conversion (limited to first few for debugging)""" if len(self._conversion_log_count) < 3: self._conversion_log_count.append(1) logger.debug("Successful UTM conversion", idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y, latitude=latitude, longitude=longitude, descripcion=point_data.get('descripcion')) def _enrich_with_location_data(self, records: List[Dict[str, Any]], measurement_points: List[MeasurementPoint]) -> List[Dict[str, Any]]: """Enrich traffic records with location data from measurement points""" try: points_lookup = {point.id: point for point in measurement_points} for record in records: point_id = record.get('measurement_point_id') if point_id in points_lookup: point = points_lookup[point_id] record.update({ 'measurement_point_name': point.name, 'measurement_point_latitude': point.latitude, 'measurement_point_longitude': point.longitude, 'distance_to_query': point.distance }) return records except Exception as e: logger.warning("Error enriching with location data", error=str(e)) return records # ================================================================ # HTTP CLIENT METHODS # ================================================================ async def _fetch_xml_content_robust(self, url: str) -> Optional[str]: """Fetch XML content with robust headers for Madrid endpoints""" try: import httpx headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/xml,text/xml,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Referer': 'https://datos.madrid.es/' } async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, headers=headers) as client: logger.debug("Fetching XML from Madrid endpoint", url=url) response = await client.get(url) logger.debug("Madrid API response", status=response.status_code, content_type=response.headers.get('content-type'), content_length=len(response.content)) if response.status_code == 200: content = self._decode_response_content(response) if content and len(content) > 100: return content return None except Exception as e: logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e)) return None async def _fetch_historical_zip(self, url: str) -> Optional[bytes]: """Fetch historical ZIP data from Madrid Open Data""" try: import httpx headers = { 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', 'Accept': 'application/zip,application/octet-stream,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', } async with httpx.AsyncClient(timeout=120.0, headers=headers, follow_redirects=True) as client: logger.debug("Fetching historical ZIP", url=url) response = await client.get(url) if response.status_code == 200: content = response.content if content and len(content) > 1000: logger.debug("Successfully fetched ZIP", url=url, size=len(content)) return content else: logger.debug("ZIP file too small", url=url, size=len(content) if content else 0) else: logger.debug("ZIP not found", url=url, status=response.status_code) except Exception as e: logger.debug("Error fetching ZIP", url=url, error=str(e)) return None async def _fetch_measurement_points_csv(self, url: str) -> Optional[str]: """Fetch the measurement points CSV file""" try: import httpx headers = { 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', 'Accept': 'text/csv,application/csv,text/plain,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', } async with httpx.AsyncClient(timeout=30.0, headers=headers, follow_redirects=True) as client: logger.debug("Fetching measurement points CSV", url=url) response = await client.get(url) if response.status_code == 200: content = response.text if content and len(content) > 1000: logger.debug("Successfully fetched measurement points CSV", url=url, size=len(content)) return content else: logger.debug("Measurement points CSV too small", size=len(content)) else: logger.debug("Measurement points CSV not found", url=url, status=response.status_code) except Exception as e: logger.debug("Error fetching measurement points CSV", url=url, error=str(e)) return None def _decode_response_content(self, response) -> Optional[str]: """Decode response content with multiple encoding attempts""" try: return response.text except UnicodeDecodeError: # Try manual encoding for Spanish content for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: try: content = response.content.decode(encoding) if content and len(content) > 100: logger.debug("Successfully decoded with encoding", encoding=encoding) return content except UnicodeDecodeError: continue return None def _extract_csv_from_zip(self, zip_file, csv_filename: str) -> Optional[str]: """Extract and decode CSV content from ZIP file""" try: csv_bytes = zip_file.read(csv_filename) # Try different encodings for Spanish content for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: try: csv_content = csv_bytes.decode(encoding) logger.debug("Successfully decoded CSV", filename=csv_filename, encoding=encoding) return csv_content except UnicodeDecodeError: continue logger.warning("Could not decode CSV file", filename=csv_filename) return None except Exception as e: logger.warning("Error extracting CSV from ZIP", filename=csv_filename, error=str(e)) return None # ================================================================ # XML PROCESSING METHODS # ================================================================ def _clean_madrid_xml(self, xml_content: str) -> str: """Clean Madrid XML to handle undefined entities and encoding issues""" try: # Remove BOM if present xml_content = xml_content.lstrip('\ufeff') # Replace undefined entities entity_replacements = { ' ': ' ', '©': '©', '®': '®', '™': '™' } for entity, replacement in entity_replacements.items(): xml_content = xml_content.replace(entity, replacement) # Fix unescaped ampersands xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content) # Remove invalid control characters xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content) # Handle Spanish characters spanish_chars = { 'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U' } for spanish_char, replacement in spanish_chars.items(): xml_content = xml_content.replace(spanish_char, replacement) return xml_content except Exception as e: logger.warning("Error cleaning Madrid XML", error=str(e)) return xml_content def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]: """Extract traffic data using regex when XML parsing fails""" traffic_points = [] try: pm_pattern = r'(.*?)' pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL) for pm_content in pm_matches: try: extracted_data = self._extract_pm_data_regex(pm_content) if extracted_data and self._is_valid_traffic_point(extracted_data): traffic_points.append(extracted_data) except Exception as e: logger.debug("Error parsing regex PM match", error=str(e)) continue logger.debug("Regex extraction results", count=len(traffic_points)) return traffic_points except Exception as e: logger.error("Error in regex extraction", error=str(e)) return [] def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]: """Extract individual PM data using regex""" patterns = { 'idelem': r'(.*?)', 'intensidad': r'(.*?)', 'st_x': r'(.*?)', 'st_y': r'(.*?)', 'descripcion': r'(.*?)' } extracted = {} for field, pattern in patterns.items(): match = re.search(pattern, pm_content) extracted[field] = match.group(1) if match else '' if extracted['idelem'] and extracted['st_x'] and extracted['st_y']: # Convert coordinates latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y']) if latitude and longitude: return { 'idelem': extracted['idelem'], 'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}", 'intensidad': self._safe_int(extracted['intensidad']), 'latitude': latitude, 'longitude': longitude, 'ocupacion': 0, 'carga': 0, 'nivelServicio': 0, 'error': 'N' } return {} # ================================================================ # TRAFFIC ANALYSIS METHODS # ================================================================ def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]: """Find the nearest traffic measurement point to given coordinates""" if not traffic_data: return None min_distance = float('inf') nearest_point = None for point in traffic_data: if point.get('latitude') and point.get('longitude'): distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) if distance < min_distance: min_distance = distance nearest_point = point # Madrid area search radius (15km) if nearest_point and min_distance <= 15.0: logger.debug("Found nearest Madrid traffic point", distance_km=min_distance, point_name=nearest_point.get('descripcion'), point_id=nearest_point.get('idelem')) return nearest_point logger.debug("No nearby Madrid traffic points found", min_distance=min_distance, total_points=len(traffic_data)) return None def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float: """Get distance to closest traffic point for debugging""" if not traffic_data: return float('inf') min_distance = float('inf') for point in traffic_data: if point.get('latitude') and point.get('longitude'): distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) min_distance = min(min_distance, distance) return min_distance def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]: """Parse Madrid traffic measurement into standardized format""" try: service_level = traffic_point.get('nivelServicio', 0) congestion_mapping = { TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value, TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value, TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value, TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value } # Speed estimation based on service level speed_mapping = { TrafficServiceLevel.FLUID.value: 45, TrafficServiceLevel.DENSE.value: 25, TrafficServiceLevel.CONGESTED.value: 15, TrafficServiceLevel.BLOCKED.value: 5 } congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value) average_speed = speed_mapping.get(service_level, 25) # Calculate pedestrian estimate hour = datetime.now().hour pedestrian_multiplier = self._get_pedestrian_multiplier(hour) pedestrian_count = int(100 * pedestrian_multiplier) return { "date": datetime.now(), "traffic_volume": traffic_point.get('intensidad', 0), "pedestrian_count": pedestrian_count, "congestion_level": congestion_level, "average_speed": average_speed, "occupation_percentage": traffic_point.get('ocupacion', 0), "load_percentage": traffic_point.get('carga', 0), "measurement_point_id": traffic_point.get('idelem'), "measurement_point_name": traffic_point.get('descripcion'), "road_type": "URB", "source": DataSource.MADRID_REALTIME.value } except Exception as e: logger.error("Error parsing traffic measurement", error=str(e)) return self._get_default_traffic_data() def _get_pedestrian_multiplier(self, hour: int) -> float: """Get pedestrian multiplier based on time of day""" if 13 <= hour <= 15: # Lunch time return 2.5 elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours return 2.0 else: return 1.0 # ================================================================ # SYNTHETIC DATA GENERATION METHODS # ================================================================ async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]: """Generate realistic Madrid traffic data as fallback""" now = datetime.now() hour = now.hour is_weekend = now.weekday() >= 5 traffic_params = self._calculate_traffic_parameters(hour, is_weekend) return { "date": now, "traffic_volume": traffic_params['volume'], "pedestrian_count": traffic_params['pedestrians'], "congestion_level": traffic_params['congestion'], "average_speed": traffic_params['speed'], "occupation_percentage": min(100, traffic_params['volume'] // 2), "load_percentage": min(100, traffic_params['volume'] // 3), "measurement_point_id": "madrid_synthetic", "measurement_point_name": "Madrid Centro (Synthetic)", "road_type": "URB", "source": DataSource.SYNTHETIC.value } async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Generate synthetic historical traffic data with realistic patterns""" try: import random from datetime import timedelta historical_data = [] current_date = start_date # Seed random for consistent data random.seed(hash(f"{latitude}{longitude}")) while current_date < end_date: # Calculate how many hours to generate for this day if current_date.date() == end_date.date(): # Same day as end_date, only generate up to end_date hour end_hour = end_date.hour else: # Full day end_hour = 24 # Generate hourly records for this day for hour in range(current_date.hour if current_date == start_date else 0, end_hour): record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0) # Skip if record time is at or beyond end_date if record_time >= end_date: break traffic_params = self._generate_synthetic_traffic_params(record_time, random) traffic_record = { "date": record_time, "traffic_volume": traffic_params['volume'], "pedestrian_count": traffic_params['pedestrians'], "congestion_level": traffic_params['congestion'], "average_speed": traffic_params['speed'], "occupation_percentage": min(100, traffic_params['volume'] // 2), "load_percentage": min(100, traffic_params['volume'] // 3), "measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}", "measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})", "road_type": "URB", "source": DataSource.SYNTHETIC_HISTORICAL.value } historical_data.append(traffic_record) # Move to next day if current_date.date() == end_date.date(): # We've processed the end date, stop break else: # Move to start of next day current_date = (current_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) logger.info("Generated historical traffic data", records=len(historical_data), start=start_date, end=end_date) return historical_data except Exception as e: logger.error("Error generating historical traffic data", error=str(e)) return [] def _calculate_traffic_parameters(self, hour: int, is_weekend: bool) -> Dict[str, Any]: """Calculate traffic parameters based on time and day type""" base_traffic = 100 if not is_weekend: if 7 <= hour <= 9: multiplier, congestion, speed = 2.2, "high", 15 elif 18 <= hour <= 20: multiplier, congestion, speed = 2.5, "high", 12 elif 12 <= hour <= 14: multiplier, congestion, speed = 1.6, "medium", 25 else: multiplier, congestion, speed = 1.0, "low", 40 else: if 11 <= hour <= 14: multiplier, congestion, speed = 1.4, "medium", 30 else: multiplier, congestion, speed = 0.8, "low", 45 volume = int(base_traffic * multiplier) pedestrians = int(150 * self._get_pedestrian_multiplier(hour)) return { 'volume': volume, 'congestion': congestion, 'speed': max(10, speed), 'pedestrians': pedestrians } def _generate_synthetic_traffic_params(self, record_time: datetime, random_gen) -> Dict[str, Any]: """Generate synthetic traffic parameters with random variations""" hour = record_time.hour day_of_week = record_time.weekday() month = record_time.month base_params = self._calculate_traffic_parameters(hour, day_of_week >= 5) # Add random variations volume_variation = random_gen.uniform(-0.3, 0.3) speed_variation = random_gen.randint(-5, 5) # Apply seasonal adjustments seasonal_multiplier = 0.8 if month in [7, 8] else (1.1 if month in [11, 12] else 1.0) # Weekend specific adjustments if day_of_week >= 5 and hour in [11, 12, 13, 14, 15]: base_params['volume'] = int(base_params['volume'] * 1.4) base_params['congestion'] = "medium" return { 'volume': max(10, int(base_params['volume'] * (1 + volume_variation) * seasonal_multiplier)), 'congestion': base_params['congestion'], 'speed': max(10, min(60, base_params['speed'] + speed_variation)), 'pedestrians': int(base_params['pedestrians'] * random_gen.uniform(0.8, 1.2)) } def _get_fallback_measurement_points(self, latitude: float, longitude: float) -> List[MeasurementPoint]: """Generate fallback measurement points when CSV is not available""" madrid_points = [ (40.4168, -3.7038, "Madrid Centro"), (40.4200, -3.7060, "Gran Vía"), (40.4155, -3.7074, "Plaza Mayor"), (40.4152, -3.6844, "Retiro"), (40.4063, -3.6932, "Atocha"), ] fallback_points = [] for i, (lat, lon, name) in enumerate(madrid_points): distance = self._calculate_distance(latitude, longitude, lat, lon) point = MeasurementPoint( id=f'fallback_{i+1000}', latitude=lat, longitude=lon, distance=distance, name=name, type='URB' ) fallback_points.append(point) fallback_points.sort(key=lambda x: x.distance) return fallback_points[:5] def _get_default_traffic_data(self) -> Dict[str, Any]: """Get default traffic data when parsing fails""" return { "date": datetime.now(), "traffic_volume": 100, "pedestrian_count": 150, "congestion_level": CongestionLevel.MEDIUM.value, "average_speed": 25, "occupation_percentage": 30, "load_percentage": 40, "measurement_point_id": "unknown", "measurement_point_name": "Unknown location", "road_type": "URB", "source": DataSource.SYNTHETIC.value } # ================================================================ # CORE UTILITY METHODS # ================================================================ def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates using Haversine formula""" R = 6371 # Earth's radius in km dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def _safe_int(self, value_str: str) -> int: """Safely convert string to int""" try: return int(float(value_str.replace(',', '.'))) except (ValueError, TypeError): return 0 def _safe_float(self, value_str: str) -> float: """Safely convert string to float""" try: return float(value_str.replace(',', '.')) except (ValueError, TypeError): return 0.0