# ================================================================ # services/data/app/external/madrid_opendata.py - FIXED XML PARSER # ================================================================ """Madrid Open Data API client with fixed XML parser for actual structure""" import math import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import structlog import re from app.external.base_client import BaseAPIClient from app.core.config import settings import pyproj logger = structlog.get_logger() class MadridOpenDataClient(BaseAPIClient): def __init__(self): super().__init__( base_url="https://datos.madrid.es", api_key=None ) # WORKING Madrid traffic endpoints (verified) self.traffic_endpoints = [ # Primary working endpoint "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml", ] async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: """Get current traffic data for location using working Madrid endpoints""" try: logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude) # Try the working endpoint for endpoint in self.traffic_endpoints: try: logger.debug("Trying traffic endpoint", endpoint=endpoint) traffic_data = await self._fetch_traffic_xml_data(endpoint) if traffic_data: logger.info("Successfully fetched Madrid traffic data", endpoint=endpoint, points=len(traffic_data)) # Find nearest traffic measurement point nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data) if nearest_point: parsed_data = self._parse_traffic_measurement(nearest_point) logger.debug("Successfully parsed real Madrid traffic data", point_name=nearest_point.get('descripcion'), point_id=nearest_point.get('idelem')) return parsed_data else: logger.debug("No nearby traffic points found", lat=latitude, lon=longitude, closest_distance=self._get_closest_distance(latitude, longitude, traffic_data)) except Exception as e: logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e)) continue # If no real data available, use synthetic data logger.info("No nearby Madrid traffic points found, using synthetic data") return await self._generate_synthetic_traffic(latitude, longitude) except Exception as e: logger.error("Failed to get current traffic", error=str(e)) return await self._generate_synthetic_traffic(latitude, longitude) async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]: """Fetch and parse Madrid traffic XML data""" try: xml_content = await self._fetch_xml_content_robust(endpoint) if not xml_content: logger.debug("No XML content received", endpoint=endpoint) return None # Log XML structure for debugging logger.debug("Madrid XML content preview", length=len(xml_content), first_500=xml_content[:500] if len(xml_content) > 500 else xml_content) # Parse Madrid traffic XML with the correct structure traffic_points = self._parse_madrid_traffic_xml(xml_content) if traffic_points: logger.debug("Successfully parsed Madrid traffic XML", points=len(traffic_points)) return traffic_points else: logger.warning("No traffic points found in XML", endpoint=endpoint) return None except Exception as e: logger.error("Error fetching traffic XML data", endpoint=endpoint, error=str(e)) return None def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]: """Parse Madrid traffic XML with correct structure (...)""" traffic_points = [] try: # Clean the XML to handle undefined entities and encoding issues cleaned_xml = self._clean_madrid_xml(xml_content) # Parse XML root = ET.fromstring(cleaned_xml) # Log XML structure logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root))) # Madrid uses root with children if root.tag == 'pms': pm_elements = root.findall('pm') logger.debug("Found PM elements", count=len(pm_elements)) for pm in pm_elements: try: traffic_point = self._extract_madrid_pm_element(pm) # Validate essential data (coordinates and ID) if (traffic_point.get('latitude') and traffic_point.get('longitude') and traffic_point.get('idelem')): traffic_points.append(traffic_point) # Log first few points for debugging if len(traffic_points) <= 3: logger.debug("Sample traffic point", id=traffic_point['idelem'], lat=traffic_point['latitude'], lon=traffic_point['longitude'], intensity=traffic_point.get('intensidad')) except Exception as e: logger.debug("Error parsing PM element", error=str(e)) continue else: logger.warning("Unexpected XML root tag", root_tag=root.tag) logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points)) return traffic_points except ET.ParseError as e: logger.warning("Failed to parse Madrid XML", error=str(e)) # Try regex extraction as fallback return self._extract_traffic_data_regex(xml_content) except Exception as e: logger.error("Error in Madrid traffic XML parsing", error=str(e)) return [] def _clean_madrid_xml(self, xml_content: str) -> str: """Clean Madrid XML to handle undefined entities and encoding issues""" try: # Remove BOM if present xml_content = xml_content.lstrip('\ufeff') # Remove or replace undefined entities that cause parsing errors # Common undefined entities in Madrid data xml_content = xml_content.replace(' ', ' ') xml_content = xml_content.replace('©', '©') xml_content = xml_content.replace('®', '®') xml_content = xml_content.replace('™', '™') # Fix unescaped ampersands (but not already escaped ones) xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content) # Remove invalid control characters xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content) # Handle Spanish characters that might be causing issues spanish_chars = { 'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U' } for spanish_char, replacement in spanish_chars.items(): xml_content = xml_content.replace(spanish_char, replacement) return xml_content except Exception as e: logger.warning("Error cleaning Madrid XML", error=str(e)) return xml_content def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]: """Extract traffic data from Madrid element with proper coordinate conversion""" try: # Based on the actual Madrid XML structure shown in logs point_data = {} utm_x = None utm_y = None # Extract all child elements for child in pm_element: tag = child.tag text = child.text.strip() if child.text else '' if tag == 'idelem': point_data['idelem'] = text elif tag == 'descripcion': point_data['descripcion'] = text elif tag == 'intensidad': point_data['intensidad'] = self._safe_int(text) elif tag == 'ocupacion': point_data['ocupacion'] = self._safe_float(text) elif tag == 'carga': point_data['carga'] = self._safe_int(text) elif tag == 'nivelServicio': point_data['nivelServicio'] = self._safe_int(text) elif tag == 'st_x': # Store UTM X coordinate for later conversion utm_x = text point_data['utm_x'] = text # Keep original for debugging elif tag == 'st_y': # Store UTM Y coordinate for later conversion utm_y = text point_data['utm_y'] = text # Keep original for debugging elif tag == 'error': point_data['error'] = text elif tag == 'subarea': point_data['subarea'] = text elif tag == 'accesoAsociado': point_data['accesoAsociado'] = text elif tag == 'intensidadSat': point_data['intensidadSat'] = self._safe_int(text) # Convert UTM coordinates to lat/lon if both are available if utm_x and utm_y: latitude, longitude = self._convert_utm_coordinates_accurate(utm_x, utm_y) if latitude is not None and longitude is not None: # Validate that coordinates are actually in Madrid area if self._validate_madrid_coordinates(latitude, longitude): point_data['latitude'] = latitude point_data['longitude'] = longitude # Log first few successful conversions for verification if len(getattr(self, '_conversion_log_count', [])) < 3: if not hasattr(self, '_conversion_log_count'): self._conversion_log_count = [] self._conversion_log_count.append(1) logger.debug("Successful UTM conversion", idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y, latitude=latitude, longitude=longitude, descripcion=point_data.get('descripcion')) else: # Log invalid coordinates for debugging logger.debug("Invalid Madrid coordinates after conversion", idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y, converted_lat=latitude, converted_lon=longitude, descripcion=point_data.get('descripcion')) # Don't include this point - return empty dict return {} else: # Conversion failed logger.debug("UTM conversion failed", idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y) return {} else: # Missing coordinates logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem'), has_utm_x=utm_x is not None, has_utm_y=utm_y is not None) return {} return point_data except Exception as e: logger.debug("Error extracting Madrid PM element", error=str(e)) return {} def _convert_utm_coordinates_accurate(self, utm_x_str: str, utm_y_str: str) -> tuple[Optional[float], Optional[float]]: """Convert UTM coordinates to lat/lon using accurate pyproj library""" try: utm_x = float(utm_x_str.replace(',', '.')) utm_y = float(utm_y_str.replace(',', '.')) # Define UTM Zone 30N projection (EPSG:25830) utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', preserve_units=False) # Convert to latitude/longitude longitude, latitude = utm_proj(utm_x, utm_y, inverse=True) return round(latitude, 6), round(longitude, 6) except (ValueError, TypeError, Exception): return None, None def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool: """Validate that converted coordinates are actually in Madrid area""" # Madrid bounds (expanded slightly to include metro area) madrid_lat_min, madrid_lat_max = 40.31, 40.56 madrid_lon_min, madrid_lon_max = -3.89, -3.51 return (madrid_lat_min <= latitude <= madrid_lat_max and madrid_lon_min <= longitude <= madrid_lon_max) def _safe_int(self, value_str: str) -> int: """Safely convert string to int""" try: return int(float(value_str.replace(',', '.'))) except (ValueError, TypeError): return 0 def _safe_float(self, value_str: str) -> float: """Safely convert string to float""" try: return float(value_str.replace(',', '.')) except (ValueError, TypeError): return 0.0 async def _fetch_xml_content_robust(self, url: str) -> Optional[str]: """Fetch XML content with robust headers for Madrid endpoints""" try: import httpx # Headers optimized for Madrid Open Data headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'application/xml,text/xml,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Referer': 'https://datos.madrid.es/' } async with httpx.AsyncClient( timeout=30.0, follow_redirects=True, headers=headers ) as client: logger.debug("Fetching XML from Madrid endpoint", url=url) response = await client.get(url) logger.debug("Madrid API response", status=response.status_code, content_type=response.headers.get('content-type'), content_length=len(response.content)) if response.status_code == 200: try: content = response.text if content and len(content) > 100: return content except UnicodeDecodeError: # Try manual encoding for Spanish content for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: try: content = response.content.decode(encoding) if content and len(content) > 100: logger.debug("Successfully decoded with encoding", encoding=encoding) return content except UnicodeDecodeError: continue return None except Exception as e: logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e)) return None def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]: """Extract traffic data using regex when XML parsing fails""" traffic_points = [] try: # Pattern to match Madrid PM elements pm_pattern = r'(.*?)' pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL) for pm_content in pm_matches: try: # Extract individual fields idelem_match = re.search(r'(.*?)', pm_content) intensidad_match = re.search(r'(.*?)', pm_content) st_x_match = re.search(r'(.*?)', pm_content) st_y_match = re.search(r'(.*?)', pm_content) descripcion_match = re.search(r'(.*?)', pm_content) if idelem_match and st_x_match and st_y_match: idelem = idelem_match.group(1) st_x = st_x_match.group(1) st_y = st_y_match.group(1) intensidad = intensidad_match.group(1) if intensidad_match else '0' descripcion = descripcion_match.group(1) if descripcion_match else f'Point {idelem}' # Convert coordinates longitude = self._convert_utm_to_lon(st_x) latitude = self._convert_utm_to_lat(st_y) if latitude and longitude: traffic_point = { 'idelem': idelem, 'descripcion': descripcion, 'intensidad': self._safe_int(intensidad), 'latitude': latitude, 'longitude': longitude, 'ocupacion': 0, 'carga': 0, 'nivelServicio': 0, 'error': 'N' } traffic_points.append(traffic_point) except Exception as e: logger.debug("Error parsing regex PM match", error=str(e)) continue logger.debug("Regex extraction results", count=len(traffic_points)) return traffic_points except Exception as e: logger.error("Error in regex extraction", error=str(e)) return [] def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float: """Get distance to closest traffic point for debugging""" if not traffic_data: return float('inf') min_distance = float('inf') for point in traffic_data: if point.get('latitude') and point.get('longitude'): distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) min_distance = min(min_distance, distance) return min_distance def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]: """Find the nearest traffic measurement point to given coordinates""" if not traffic_data: return None min_distance = float('inf') nearest_point = None for point in traffic_data: if point.get('latitude') and point.get('longitude'): distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) if distance < min_distance: min_distance = distance nearest_point = point # Madrid area search radius (15km) if nearest_point and min_distance <= 15.0: logger.debug("Found nearest Madrid traffic point", distance_km=min_distance, point_name=nearest_point.get('descripcion'), point_id=nearest_point.get('idelem')) return nearest_point logger.debug("No nearby Madrid traffic points found", min_distance=min_distance, total_points=len(traffic_data)) return None def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in km using Haversine formula""" R = 6371 # Earth's radius in km dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance = R * c return distance def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]: """Parse Madrid traffic measurement into standardized format""" try: # Madrid traffic service levels: 0=fluid, 1=dense, 2=congested, 3=cut service_level_map = { 0: "low", 1: "medium", 2: "high", 3: "blocked" } service_level = traffic_point.get('nivelServicio', 0) # Estimate speed based on service level and road type if service_level == 0: # Fluid average_speed = 45 elif service_level == 1: # Dense average_speed = 25 elif service_level == 2: # Congested average_speed = 15 else: # Cut/Blocked average_speed = 5 congestion_level = service_level_map.get(service_level, "medium") # Calculate pedestrian estimate based on location hour = datetime.now().hour if 13 <= hour <= 15: # Lunch time pedestrian_multiplier = 2.5 elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours pedestrian_multiplier = 2.0 else: pedestrian_multiplier = 1.0 pedestrian_count = int(100 * pedestrian_multiplier) return { "date": datetime.now(), "traffic_volume": traffic_point.get('intensidad', 0), "pedestrian_count": pedestrian_count, "congestion_level": congestion_level, "average_speed": average_speed, "occupation_percentage": traffic_point.get('ocupacion', 0), "load_percentage": traffic_point.get('carga', 0), "measurement_point_id": traffic_point.get('idelem'), "measurement_point_name": traffic_point.get('descripcion'), "road_type": "URB", "source": "madrid_opendata" } except Exception as e: logger.error("Error parsing traffic measurement", error=str(e)) return self._get_default_traffic_data() def _get_default_traffic_data(self) -> Dict[str, Any]: """Get default traffic data when parsing fails""" return { "date": datetime.now(), "traffic_volume": 100, "pedestrian_count": 150, "congestion_level": "medium", "average_speed": 25, "occupation_percentage": 30, "load_percentage": 40, "measurement_point_id": "unknown", "measurement_point_name": "Unknown location", "road_type": "URB", "source": "synthetic" } async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]: """Generate realistic Madrid traffic data as fallback""" now = datetime.now() hour = now.hour is_weekend = now.weekday() >= 5 base_traffic = 100 if not is_weekend: if 7 <= hour <= 9: traffic_multiplier = 2.2 congestion = "high" avg_speed = 15 elif 18 <= hour <= 20: traffic_multiplier = 2.5 congestion = "high" avg_speed = 12 elif 12 <= hour <= 14: traffic_multiplier = 1.6 congestion = "medium" avg_speed = 25 else: traffic_multiplier = 1.0 congestion = "low" avg_speed = 40 else: if 11 <= hour <= 14: traffic_multiplier = 1.4 congestion = "medium" avg_speed = 30 else: traffic_multiplier = 0.8 congestion = "low" avg_speed = 45 traffic_volume = int(base_traffic * traffic_multiplier) # Pedestrian calculation pedestrian_base = 150 if 13 <= hour <= 15: pedestrian_count = int(pedestrian_base * 2.5) elif 8 <= hour <= 9 or 18 <= hour <= 20: pedestrian_count = int(pedestrian_base * 2.0) else: pedestrian_count = int(pedestrian_base * 1.0) return { "date": now, "traffic_volume": traffic_volume, "pedestrian_count": pedestrian_count, "congestion_level": congestion, "average_speed": max(10, avg_speed), "occupation_percentage": min(100, traffic_volume // 2), "load_percentage": min(100, traffic_volume // 3), "measurement_point_id": "madrid_synthetic", "measurement_point_name": "Madrid Centro (Synthetic)", "road_type": "URB", "source": "synthetic" } async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Get historical traffic data from Madrid Open Data Args: latitude: Location latitude longitude: Location longitude start_date: Start date for historical data end_date: End date for historical data Returns: List of historical traffic data dictionaries """ try: logger.debug("Fetching Madrid historical traffic data", lat=latitude, lon=longitude, start=start_date, end=end_date) historical_data = [] # Generate historical data using synthetic generation for periods before API availability # or when real data is not available if (end_date - start_date).days <= 90: # Reasonable range for synthetic data historical_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date) logger.info("Generated synthetic historical traffic data", records=len(historical_data)) else: logger.warning("Date range too large for historical traffic data", days=(end_date - start_date).days) return [] # Try to fetch real data if API key is available and for recent dates if hasattr(self, 'api_key') and self.api_key: try: real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date) if real_data: # Merge real data with synthetic data or replace synthetic data historical_data = real_data logger.info("Fetched real historical traffic data", records=len(real_data)) except Exception as e: logger.warning("Failed to fetch real historical data, using synthetic", error=str(e)) return historical_data except Exception as e: logger.error("Error getting historical traffic data", error=str(e)) return [] async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Fetch real historical traffic data from Madrid Open Data portal Madrid provides historical CSV files by month at: https://datos.madrid.es/egob/catalogo/[ID]-[YEAR]-[MONTH]-trafico-historico.csv """ try: historical_data = [] current_date = start_date.replace(day=1) # Start from beginning of month while current_date <= end_date: try: # Madrid historical traffic CSV URL pattern year = current_date.year month = current_date.month # Try different URL patterns based on Madrid Open Data structure historical_urls = [ f"https://datos.madrid.es/egob/catalogo/300217-{year}-{month:02d}-trafico-historico.csv", f"https://datos.madrid.es/egob/catalogo/trafico-historico-{year}-{month:02d}.csv", f"https://datos.madrid.es/egob/catalogo/{year}{month:02d}-trafico-historico.csv" ] for url in historical_urls: csv_data = await self._fetch_historical_csv(url) if csv_data: # Parse CSV and filter by location month_data = await self._parse_historical_csv(csv_data, latitude, longitude, start_date, end_date) historical_data.extend(month_data) logger.debug("Fetched historical data for month", year=year, month=month, records=len(month_data)) break # Move to next month if current_date.month == 12: current_date = current_date.replace(year=current_date.year + 1, month=1) else: current_date = current_date.replace(month=current_date.month + 1) except Exception as e: logger.warning("Error fetching data for month", year=current_date.year, month=current_date.month, error=str(e)) # Move to next month even on error if current_date.month == 12: current_date = current_date.replace(year=current_date.year + 1, month=1) else: current_date = current_date.replace(month=current_date.month + 1) return historical_data except Exception as e: logger.error("Error fetching real historical traffic data", error=str(e)) return [] async def _fetch_historical_csv(self, url: str) -> Optional[str]: """Fetch historical CSV data from Madrid Open Data""" try: import httpx headers = { 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', 'Accept': 'text/csv,application/csv,text/plain,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', } async with httpx.AsyncClient(timeout=60.0, headers=headers) as client: logger.debug("Fetching historical CSV", url=url) response = await client.get(url) if response.status_code == 200: content = response.text if content and len(content) > 100: # Ensure we got actual data logger.debug("Successfully fetched CSV", url=url, size=len(content)) return content else: logger.debug("CSV not found", url=url, status=response.status_code) except Exception as e: logger.debug("Error fetching CSV", url=url, error=str(e)) return None async def _parse_historical_csv(self, csv_content: str, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Parse Madrid historical traffic CSV and filter by location and date range""" try: import csv from io import StringIO historical_records = [] csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') # Get the nearest measurement points to our coordinates measurement_points = await self._get_measurement_points_near_location(latitude, longitude) target_point_ids = [point['id'] for point in measurement_points[:3]] # Use 3 nearest points for row in csv_reader: try: # Parse Madrid CSV format # Expected columns: fecha, hora, idelem, intensidad, ocupacion, carga, nivelServicio, etc. # Extract date and time if 'fecha' in row and 'hora' in row: date_str = row.get('fecha', '').strip() time_str = row.get('hora', '').strip() # Parse Madrid date format (usually DD/MM/YYYY) if date_str and time_str: try: # Try different date formats for date_format in ['%d/%m/%Y', '%Y-%m-%d', '%d-%m-%Y']: try: record_date = datetime.strptime(f"{date_str} {time_str}", f"{date_format} %H:%M") break except ValueError: continue else: continue # Skip if no date format worked # Check if record is in our date range if not (start_date <= record_date <= end_date): continue except ValueError: continue else: continue # Check if this record is from a measurement point near our location point_id = row.get('idelem', '').strip() if point_id not in target_point_ids: continue # Parse traffic data traffic_record = { "date": record_date, "traffic_volume": self._safe_int(row.get('intensidad', '0')), "occupation_percentage": self._safe_int(row.get('ocupacion', '0')), "load_percentage": self._safe_int(row.get('carga', '0')), "service_level": self._safe_int(row.get('nivelServicio', '0')), "measurement_point_id": point_id, "measurement_point_name": row.get('descripcion', f'Point {point_id}'), "road_type": row.get('tipo_elem', 'URB'), "source": "madrid_opendata_historical" } # Calculate derived metrics service_level = traffic_record['service_level'] if service_level == 0: # Fluid congestion_level = "low" avg_speed = 45 pedestrian_multiplier = 1.0 elif service_level == 1: # Dense congestion_level = "medium" avg_speed = 25 pedestrian_multiplier = 1.5 elif service_level == 2: # Congested congestion_level = "high" avg_speed = 15 pedestrian_multiplier = 2.0 else: # Cut/Blocked congestion_level = "blocked" avg_speed = 5 pedestrian_multiplier = 0.5 traffic_record.update({ "congestion_level": congestion_level, "average_speed": avg_speed, "pedestrian_count": int(100 * pedestrian_multiplier) }) historical_records.append(traffic_record) except Exception as e: logger.debug("Error parsing CSV row", error=str(e)) continue return historical_records except Exception as e: logger.error("Error parsing historical CSV", error=str(e)) return [] async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[Dict[str, Any]]: """Get measurement points near the specified location""" try: # Try to fetch current traffic data to get measurement points current_traffic = await self._fetch_traffic_xml_data(self.traffic_endpoints[0]) if current_traffic: # Calculate distances and sort by proximity points_with_distance = [] for point in current_traffic: if point.get('latitude') and point.get('longitude'): distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) points_with_distance.append({ 'id': point.get('idelem'), 'distance': distance, 'latitude': point['latitude'], 'longitude': point['longitude'], 'name': point.get('descripcion', '') }) # Sort by distance and return closest points points_with_distance.sort(key=lambda x: x['distance']) return points_with_distance[:5] # Return 5 closest points # Fallback: return synthetic point IDs based on Madrid geography return [ {'id': 'madrid_centro_01', 'distance': 1.0}, {'id': 'madrid_centro_02', 'distance': 2.0}, {'id': 'madrid_centro_03', 'distance': 3.0} ] except Exception as e: logger.warning("Error getting measurement points", error=str(e)) return [{'id': 'madrid_default', 'distance': 0.0}] async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Generate synthetic historical traffic data for the specified period This method creates realistic historical traffic patterns based on: - Time of day patterns - Day of week patterns - Seasonal variations - Random variations for realism """ try: import random from datetime import timedelta historical_data = [] current_date = start_date # Seed random for consistent but varied data random.seed(hash(f"{latitude}{longitude}")) while current_date <= end_date: # Generate 24 hourly records for each day for hour in range(24): record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0) # Base traffic calculation base_traffic = 100 hour_of_day = record_time.hour day_of_week = record_time.weekday() # 0=Monday, 6=Sunday month = record_time.month # Time of day patterns if 7 <= hour_of_day <= 9: # Morning rush traffic_multiplier = 2.2 + random.uniform(-0.3, 0.3) congestion = "high" avg_speed = 15 + random.randint(-5, 5) elif 18 <= hour_of_day <= 20: # Evening rush traffic_multiplier = 2.5 + random.uniform(-0.4, 0.4) congestion = "high" avg_speed = 12 + random.randint(-3, 8) elif 12 <= hour_of_day <= 14: # Lunch time traffic_multiplier = 1.6 + random.uniform(-0.2, 0.2) congestion = "medium" avg_speed = 25 + random.randint(-5, 10) elif 22 <= hour_of_day or hour_of_day <= 6: # Night traffic_multiplier = 0.3 + random.uniform(-0.1, 0.2) congestion = "low" avg_speed = 50 + random.randint(-10, 15) else: # Regular hours traffic_multiplier = 1.0 + random.uniform(-0.2, 0.2) congestion = "medium" avg_speed = 35 + random.randint(-10, 10) # Weekend adjustments if day_of_week >= 5: # Weekend if hour_of_day in [11, 12, 13, 14, 15]: # Weekend afternoon peak traffic_multiplier *= 1.4 congestion = "medium" else: traffic_multiplier *= 0.7 if congestion == "high": congestion = "medium" # Seasonal adjustments if month in [7, 8]: # Summer - less traffic due to vacations traffic_multiplier *= 0.8 elif month in [11, 12]: # Holiday season - more traffic traffic_multiplier *= 1.1 # Calculate final values traffic_volume = max(10, int(base_traffic * traffic_multiplier)) avg_speed = max(10, min(60, avg_speed)) # Pedestrian calculation pedestrian_base = 150 if 13 <= hour_of_day <= 15: # Lunch time pedestrian_count = int(pedestrian_base * 2.5 * random.uniform(0.8, 1.2)) elif 8 <= hour_of_day <= 9 or 18 <= hour_of_day <= 20: # Rush hours pedestrian_count = int(pedestrian_base * 2.0 * random.uniform(0.8, 1.2)) else: pedestrian_count = int(pedestrian_base * 1.0 * random.uniform(0.5, 1.5)) # Create traffic record traffic_record = { "date": record_time, "traffic_volume": traffic_volume, "pedestrian_count": pedestrian_count, "congestion_level": congestion, "average_speed": avg_speed, "occupation_percentage": min(100, traffic_volume // 2), "load_percentage": min(100, traffic_volume // 3), "measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}", "measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})", "road_type": "URB", "source": "synthetic_historical" } historical_data.append(traffic_record) # Move to next day current_date += timedelta(days=1) logger.info("Generated historical traffic data", records=len(historical_data), start=start_date, end=end_date) return historical_data except Exception as e: logger.error("Error generating historical traffic data", error=str(e)) return [] async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: """Get traffic incidents and events""" return []