# ================================================================ # services/data/app/external/madrid_opendata.py # ================================================================ """Madrid Open Data API client for traffic and events - WITH REAL ENDPOINTS""" import math import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import structlog from app.external.base_client import BaseAPIClient from app.core.config import settings logger = structlog.get_logger() class MadridOpenDataClient(BaseAPIClient): def __init__(self): super().__init__( base_url="https://datos.madrid.es/egob/catalogo", api_key=None # Madrid Open Data doesn't require API key for public traffic data ) # Real-time traffic data XML endpoint (updated every 5 minutes) self.traffic_xml_url = "https://datos.madrid.es/egob/catalogo/300233-0-trafico-tiempo-real.xml" # Traffic incidents XML endpoint (updated every 5 minutes) self.incidents_xml_url = "http://informo.munimadrid.es/informo/tmadrid/incid_aytomadrid.xml" # KML traffic intensity map (updated every 5 minutes) self.traffic_kml_url = "https://datos.madrid.es/egob/catalogo/300233-1-intensidad-trafico.kml" async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: """Get current traffic data for location using real Madrid Open Data""" try: # Step 1: Fetch real-time traffic XML data traffic_data = await self._fetch_traffic_xml() if traffic_data: # Step 2: Find nearest traffic measurement point nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data) if nearest_point: # Step 3: Parse traffic data for the nearest point return self._parse_traffic_measurement(nearest_point) # Fallback to synthetic data if real data not available logger.info("Real traffic data not available, using synthetic data") return await self._generate_synthetic_traffic(latitude, longitude) except Exception as e: logger.error("Failed to get current traffic from Madrid Open Data", error=str(e)) return await self._generate_synthetic_traffic(latitude, longitude) async def _fetch_traffic_xml(self) -> Optional[List[Dict[str, Any]]]: """Fetch and parse real-time traffic XML from Madrid Open Data""" try: # Use the direct URL fetching method from base client xml_content = await self._fetch_xml_content(self.traffic_xml_url) if not xml_content: logger.warning("No XML content received from Madrid traffic API") return None # Parse XML content root = ET.fromstring(xml_content) traffic_points = [] # Madrid traffic XML structure: ... for pmed in root.findall('.//pmed'): try: traffic_point = { 'id': pmed.get('id'), 'latitude': float(pmed.get('y', 0)) if pmed.get('y') else None, 'longitude': float(pmed.get('x', 0)) if pmed.get('x') else None, 'intensity': int(pmed.get('intensidad', 0)) if pmed.get('intensidad') else 0, 'occupation': float(pmed.get('ocupacion', 0)) if pmed.get('ocupacion') else 0, 'load': int(pmed.get('carga', 0)) if pmed.get('carga') else 0, 'service_level': int(pmed.get('nivelServicio', 0)) if pmed.get('nivelServicio') else 0, 'speed': float(pmed.get('vmed', 0)) if pmed.get('vmed') else 0, 'error': pmed.get('error', '0'), 'measurement_date': pmed.get('fechahora', ''), 'name': pmed.get('nombre', 'Unknown'), 'type': pmed.get('tipo_elem', 'URB') # URB=Urban, C30=M-30 ring road } # Only add points with valid coordinates if traffic_point['latitude'] and traffic_point['longitude']: traffic_points.append(traffic_point) except (ValueError, TypeError) as e: logger.debug("Error parsing traffic point", error=str(e), point_id=pmed.get('id')) continue logger.info("Successfully parsed traffic data", points_count=len(traffic_points)) return traffic_points except ET.ParseError as e: logger.error("Failed to parse traffic XML", error=str(e)) return None except Exception as e: logger.error("Error fetching traffic XML", error=str(e)) return None async def _fetch_xml_content(self, url: str) -> Optional[str]: """Fetch XML content from URL, handling encoding issues""" try: import httpx async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url) response.raise_for_status() # Handle potential encoding issues with Spanish content try: return response.text except UnicodeDecodeError: # Try alternative encodings for encoding in ['latin-1', 'windows-1252', 'iso-8859-1']: try: return response.content.decode(encoding) except UnicodeDecodeError: continue logger.error("Failed to decode XML with any encoding") return None except Exception as e: logger.error("Failed to fetch XML content", url=url, error=str(e)) return None def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]: """Find the nearest traffic measurement point to given coordinates""" if not traffic_data: return None min_distance = float('inf') nearest_point = None for point in traffic_data: if point['latitude'] and point['longitude']: distance = self._calculate_distance( latitude, longitude, point['latitude'], point['longitude'] ) if distance < min_distance: min_distance = distance nearest_point = point # Only return if within reasonable distance (5km) if nearest_point and min_distance <= 5.0: logger.debug("Found nearest traffic point", distance_km=min_distance, point_name=nearest_point.get('name')) return nearest_point return None def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in km using Haversine formula""" R = 6371 # Earth's radius in km dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance = R * c return distance def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]: """Parse Madrid traffic measurement into standardized format""" try: # Madrid traffic service levels: 0=fluid, 1=dense, 2=congested, 3=cut service_level_map = { 0: "low", 1: "medium", 2: "high", 3: "blocked" } # Estimate average speed based on service level and type service_level = traffic_point.get('service_level', 0) road_type = traffic_point.get('type', 'URB') # Use real speed if available, otherwise estimate if traffic_point.get('speed', 0) > 0: average_speed = traffic_point['speed'] else: # Speed estimation based on road type and service level if road_type == 'C30': # M-30 ring road speed_map = {0: 80, 1: 50, 2: 25, 3: 10} else: # Urban roads speed_map = {0: 40, 1: 25, 2: 15, 3: 5} average_speed = speed_map.get(service_level, 20) congestion_level = service_level_map.get(service_level, "medium") # Calculate pedestrian estimate (higher in urban areas, lower on highways) base_pedestrians = 100 if road_type == 'URB' else 20 hour = datetime.now().hour # Pedestrian multiplier based on time of day if 13 <= hour <= 15: # Lunch time pedestrian_multiplier = 2.5 elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours pedestrian_multiplier = 2.0 else: pedestrian_multiplier = 1.0 return { "date": datetime.now(), "traffic_volume": traffic_point.get('intensity', 0), # vehicles/hour "pedestrian_count": int(base_pedestrians * pedestrian_multiplier), "congestion_level": congestion_level, "average_speed": max(5, int(average_speed)), # Minimum 5 km/h "occupation_percentage": traffic_point.get('occupation', 0), "load_percentage": traffic_point.get('load', 0), "measurement_point_id": traffic_point.get('id'), "measurement_point_name": traffic_point.get('name'), "road_type": road_type, "source": "madrid_opendata" } except Exception as e: logger.error("Error parsing traffic measurement", error=str(e)) return self._get_default_traffic_data() def _get_default_traffic_data(self) -> Dict[str, Any]: """Get default traffic data when parsing fails""" return { "date": datetime.now(), "traffic_volume": 100, "pedestrian_count": 150, "congestion_level": "medium", "average_speed": 25, "occupation_percentage": 30, "load_percentage": 40, "measurement_point_id": "unknown", "measurement_point_name": "Unknown location", "road_type": "URB", "source": "default" } async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Get historical traffic data (currently generates synthetic data)""" try: # Madrid provides historical data, but for now we'll generate synthetic # In production, you would fetch from: # https://datos.madrid.es/egob/catalogo/300233-2-trafico-historico.csv return await self._generate_historical_traffic(latitude, longitude, start_date, end_date) except Exception as e: logger.error("Failed to get historical traffic", error=str(e)) return [] async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: """Get traffic incidents and events near location""" try: incidents = await self._fetch_traffic_incidents() if incidents: # Filter incidents by distance nearby_incidents = [] for incident in incidents: if incident.get('latitude') and incident.get('longitude'): distance = self._calculate_distance( latitude, longitude, incident['latitude'], incident['longitude'] ) if distance <= radius_km: incident['distance_km'] = round(distance, 2) nearby_incidents.append(incident) return nearby_incidents # Fallback to synthetic events return await self._generate_synthetic_events(latitude, longitude) except Exception as e: logger.error("Failed to get events", error=str(e)) return await self._generate_synthetic_events(latitude, longitude) async def _fetch_traffic_incidents(self) -> Optional[List[Dict[str, Any]]]: """Fetch real traffic incidents from Madrid Open Data""" try: xml_content = await self._fetch_xml_content(self.incidents_xml_url) if not xml_content: return None root = ET.fromstring(xml_content) incidents = [] # Parse incident XML structure for incidencia in root.findall('.//incidencia'): try: incident = { 'id': incidencia.get('id'), 'type': incidencia.findtext('tipo', 'unknown'), 'description': incidencia.findtext('descripcion', ''), 'location': incidencia.findtext('localizacion', ''), 'start_date': incidencia.findtext('fechaInicio', ''), 'end_date': incidencia.findtext('fechaFin', ''), 'impact_level': self._categorize_incident_impact(incidencia.findtext('tipo', '')), 'latitude': self._extract_coordinate(incidencia, 'lat'), 'longitude': self._extract_coordinate(incidencia, 'lon'), 'source': 'madrid_opendata' } incidents.append(incident) except Exception as e: logger.debug("Error parsing incident", error=str(e)) continue logger.info("Successfully parsed traffic incidents", incidents_count=len(incidents)) return incidents except Exception as e: logger.error("Error fetching traffic incidents", error=str(e)) return None def _extract_coordinate(self, element, coord_type: str) -> Optional[float]: """Extract latitude or longitude from incident XML""" try: coord_element = element.find(coord_type) if coord_element is not None and coord_element.text: return float(coord_element.text) except (ValueError, TypeError): pass return None def _categorize_incident_impact(self, incident_type: str) -> str: """Categorize incident impact level based on type""" incident_type = incident_type.lower() if any(word in incident_type for word in ['accidente', 'corte', 'cerrado']): return 'high' elif any(word in incident_type for word in ['obras', 'maintenance', 'evento']): return 'medium' else: return 'low' # Keep existing synthetic data generation methods as fallbacks async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]: """Generate realistic Madrid traffic data as fallback""" now = datetime.now() hour = now.hour is_weekend = now.weekday() >= 5 # Base traffic volume base_traffic = 100 # Madrid traffic patterns if not is_weekend: # Weekdays if 7 <= hour <= 9: # Morning rush traffic_multiplier = 2.2 congestion = "high" elif 18 <= hour <= 20: # Evening rush traffic_multiplier = 2.5 congestion = "high" elif 12 <= hour <= 14: # Lunch time traffic_multiplier = 1.6 congestion = "medium" elif 6 <= hour <= 22: # Daytime traffic_multiplier = 1.2 congestion = "medium" else: # Night traffic_multiplier = 0.4 congestion = "low" else: # Weekends if 11 <= hour <= 14: # Weekend shopping traffic_multiplier = 1.4 congestion = "medium" elif 19 <= hour <= 22: # Weekend evening traffic_multiplier = 1.6 congestion = "medium" else: traffic_multiplier = 0.8 congestion = "low" # Calculate pedestrian traffic pedestrian_base = 150 if 13 <= hour <= 15: # Lunch time pedestrian_multiplier = 2.8 elif hour == 14: # School pickup time pedestrian_multiplier = 3.5 elif 20 <= hour <= 22: # Dinner time pedestrian_multiplier = 2.2 elif 8 <= hour <= 9: # Morning commute pedestrian_multiplier = 2.0 else: pedestrian_multiplier = 1.0 traffic_volume = int(base_traffic * traffic_multiplier) pedestrian_count = int(pedestrian_base * pedestrian_multiplier) # Average speed based on congestion speed_map = {"low": 45, "medium": 25, "high": 15} average_speed = speed_map[congestion] + (hash(f"{latitude}{longitude}") % 10 - 5) return { "date": now, "traffic_volume": traffic_volume, "pedestrian_count": pedestrian_count, "congestion_level": congestion, "average_speed": max(10, average_speed), # Minimum 10 km/h "occupation_percentage": min(100, traffic_volume // 2), "load_percentage": min(100, traffic_volume // 3), "source": "synthetic" } async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Generate synthetic historical traffic data""" historical_data = [] current_date = start_date while current_date <= end_date: hour = current_date.hour is_weekend = current_date.weekday() >= 5 # Base patterns similar to current traffic base_traffic = 100 if not is_weekend: if 7 <= hour <= 9 or 18 <= hour <= 20: traffic_multiplier = 2.0 + (current_date.day % 5) * 0.1 elif 12 <= hour <= 14: traffic_multiplier = 1.5 else: traffic_multiplier = 1.0 else: traffic_multiplier = 0.7 + (current_date.day % 3) * 0.2 # Add seasonal variations month = current_date.month seasonal_factor = 1.0 if month in [12, 1]: # Holiday season seasonal_factor = 0.8 elif month in [7, 8]: # Summer vacation seasonal_factor = 0.9 traffic_volume = int(base_traffic * traffic_multiplier * seasonal_factor) # Determine congestion level if traffic_volume > 160: congestion_level = "high" avg_speed = 15 elif traffic_volume > 120: congestion_level = "medium" avg_speed = 25 else: congestion_level = "low" avg_speed = 40 # Pedestrian count pedestrian_base = 150 if 13 <= hour <= 15: pedestrian_multiplier = 2.5 elif hour == 14: pedestrian_multiplier = 3.0 else: pedestrian_multiplier = 1.0 historical_data.append({ "date": current_date, "traffic_volume": traffic_volume, "pedestrian_count": int(pedestrian_base * pedestrian_multiplier), "congestion_level": congestion_level, "average_speed": avg_speed + (current_date.day % 10 - 5), "occupation_percentage": min(100, traffic_volume // 2), "load_percentage": min(100, traffic_volume // 3), "source": "synthetic" }) current_date += timedelta(hours=1) return historical_data async def _generate_synthetic_events(self, latitude: float, longitude: float) -> List[Dict[str, Any]]: """Generate synthetic Madrid events""" events = [] base_date = datetime.now().date() # Generate some sample events sample_events = [ { "name": "Mercado de San Miguel", "type": "market", "impact_level": "medium", "distance_km": 1.2 }, { "name": "Concierto en el Retiro", "type": "concert", "impact_level": "high", "distance_km": 2.5 }, { "name": "Partido Real Madrid", "type": "sports", "impact_level": "high", "distance_km": 8.0 } ] for i, event in enumerate(sample_events): event_date = base_date + timedelta(days=i + 1) events.append({ "id": f"event_{i+1}", "name": event["name"], "date": datetime.combine(event_date, datetime.min.time()), "type": event["type"], "impact_level": event["impact_level"], "distance_km": event["distance_km"], "latitude": latitude + (hash(event["name"]) % 100 - 50) / 1000, "longitude": longitude + (hash(event["name"]) % 100 - 50) / 1000, "source": "synthetic" }) return events