From d8bc64eef36af90205a1ffadc12c9aa3f72ae44b Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Thu, 24 Jul 2025 08:43:54 +0200 Subject: [PATCH] REFACTOR - MAdrid Open data file --- services/data/app/external/madrid_opendata.py | 1729 ++++++++++------- services/data/tests/conftest.py | 64 - services/data/tests/pytest.ini | 16 + services/data/tests/test_madrid_opendata.py | 405 ++++ 4 files changed, 1473 insertions(+), 741 deletions(-) create mode 100644 services/data/tests/pytest.ini create mode 100644 services/data/tests/test_madrid_opendata.py diff --git a/services/data/app/external/madrid_opendata.py b/services/data/app/external/madrid_opendata.py index 2bd655c0..20889131 100644 --- a/services/data/app/external/madrid_opendata.py +++ b/services/data/app/external/madrid_opendata.py @@ -1,53 +1,177 @@ # ================================================================ -# services/data/app/external/madrid_opendata.py - FIXED XML PARSER +# services/data/app/external/madrid_opendata.py - REFACTORED # ================================================================ -"""Madrid Open Data API client with fixed XML parser for actual structure""" +""" +Madrid Open Data API client with clean architecture and best practices + +Features: +- Real-time traffic data from XML endpoints +- Historical traffic data from ZIP files +- Measurement points integration +- Robust error handling and fallbacks +- Comprehensive logging +""" import math import xml.etree.ElementTree as ET -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timedelta import structlog import re +from dataclasses import dataclass +from enum import Enum from app.external.base_client import BaseAPIClient from app.core.config import settings - import pyproj logger = structlog.get_logger() +# ================================================================ +# CONSTANTS AND ENUMS +# ================================================================ + +class TrafficServiceLevel(Enum): + """Madrid traffic service levels""" + FLUID = 0 + DENSE = 1 + CONGESTED = 2 + BLOCKED = 3 + +class CongestionLevel(Enum): + """Standardized congestion levels""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + BLOCKED = "blocked" + +class DataSource(Enum): + """Data source types""" + MADRID_REALTIME = "madrid_opendata" + MADRID_HISTORICAL = "madrid_opendata_zip" + SYNTHETIC = "synthetic" + SYNTHETIC_HISTORICAL = "synthetic_historical" + +# Madrid geographic bounds +MADRID_BOUNDS = { + 'lat_min': 40.31, 'lat_max': 40.56, + 'lon_min': -3.89, 'lon_max': -3.51 +} + +# Constants +MAX_HISTORICAL_DAYS = 90 +MAX_CSV_PROCESSING_ROWS = 50000 +MEASUREMENT_POINTS_LIMIT = 20 +UTM_ZONE = 30 # Madrid is in UTM Zone 30N + +@dataclass +class MeasurementPoint: + """Measurement point data structure""" + id: str + latitude: float + longitude: float + distance: float + name: str + type: str + +@dataclass +class TrafficRecord: + """Traffic record data structure""" + date: datetime + traffic_volume: int + occupation_percentage: int + load_percentage: int + average_speed: int + congestion_level: str + pedestrian_count: int + measurement_point_id: str + measurement_point_name: str + road_type: str + source: str + error_status: Optional[str] = None + # Madrid-specific raw data + intensidad_raw: Optional[int] = None + ocupacion_raw: Optional[int] = None + carga_raw: Optional[int] = None + vmed_raw: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert TrafficRecord to dictionary""" + result = { + "date": self.date, + "traffic_volume": self.traffic_volume, + "occupation_percentage": self.occupation_percentage, + "load_percentage": self.load_percentage, + "average_speed": self.average_speed, + "congestion_level": self.congestion_level, + "pedestrian_count": self.pedestrian_count, + "measurement_point_id": self.measurement_point_id, + "measurement_point_name": self.measurement_point_name, + "road_type": self.road_type, + "source": self.source + } + + # Add optional fields if present + optional_fields = ['error_status', 'intensidad_raw', 'ocupacion_raw', 'carga_raw', 'vmed_raw'] + for field in optional_fields: + value = getattr(self, field, None) + if value is not None: + result[field] = value + + return result + + +# ================================================================ +# MADRID OPEN DATA CLIENT +# ================================================================ + class MadridOpenDataClient(BaseAPIClient): + """ + Madrid Open Data API client with comprehensive traffic data support + + Provides both real-time and historical traffic data from Madrid's open data portal. + Implements robust error handling, coordinate conversion, and synthetic data fallbacks. + """ def __init__(self): - super().__init__( - base_url="https://datos.madrid.es", - api_key=None - ) - - # WORKING Madrid traffic endpoints (verified) + super().__init__(base_url="https://datos.madrid.es") self.traffic_endpoints = [ - # Primary working endpoint - "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml", + "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml" ] + self.measurement_points_url = "https://datos.madrid.es/egob/catalogo/202468-260-intensidad-trafico.csv" + self._conversion_log_count = [] # Track coordinate conversion logging + + # Initialize coordinate converter + self.utm_proj = pyproj.Proj(proj='utm', zone=UTM_ZONE, ellps='WGS84', preserve_units=False) + + # ================================================================ + # PUBLIC API METHODS + # ================================================================ async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: - """Get current traffic data for location using working Madrid endpoints""" + """ + Get current traffic data for location using working Madrid endpoints + + Args: + latitude: Query location latitude + longitude: Query location longitude + + Returns: + Dict with current traffic data or None if not available + """ try: logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude) - # Try the working endpoint + # Try real-time endpoints for endpoint in self.traffic_endpoints: try: - logger.debug("Trying traffic endpoint", endpoint=endpoint) traffic_data = await self._fetch_traffic_xml_data(endpoint) if traffic_data: logger.info("Successfully fetched Madrid traffic data", - endpoint=endpoint, - points=len(traffic_data)) + endpoint=endpoint, points=len(traffic_data)) - # Find nearest traffic measurement point + # Find nearest measurement point nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data) if nearest_point: @@ -57,15 +181,16 @@ class MadridOpenDataClient(BaseAPIClient): point_id=nearest_point.get('idelem')) return parsed_data else: + closest_distance = self._get_closest_distance(latitude, longitude, traffic_data) logger.debug("No nearby traffic points found", lat=latitude, lon=longitude, - closest_distance=self._get_closest_distance(latitude, longitude, traffic_data)) + closest_distance=closest_distance) except Exception as e: logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e)) continue - # If no real data available, use synthetic data + # Fallback to synthetic data logger.info("No nearby Madrid traffic points found, using synthetic data") return await self._generate_synthetic_traffic(latitude, longitude) @@ -73,6 +198,57 @@ class MadridOpenDataClient(BaseAPIClient): logger.error("Failed to get current traffic", error=str(e)) return await self._generate_synthetic_traffic(latitude, longitude) + async def get_historical_traffic(self, latitude: float, longitude: float, + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """ + Get historical traffic data from Madrid Open Data ZIP files + + Args: + latitude: Query location latitude + longitude: Query location longitude + start_date: Start date for historical data + end_date: End date for historical data + + Returns: + List of historical traffic data dictionaries + """ + try: + logger.debug("Fetching Madrid historical traffic data", + lat=latitude, lon=longitude, start=start_date, end=end_date) + + # Validate date range + if not self._validate_date_range(start_date, end_date): + return [] + + # Generate synthetic data as fallback + synthetic_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date) + logger.info("Generated synthetic historical traffic data", records=len(synthetic_data)) + + # Try to fetch real data + try: + real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date) + if real_data: + logger.info("Fetched real historical traffic data from ZIP files", records=len(real_data)) + return real_data + else: + logger.info("No real historical data available, using synthetic data") + #return synthetic_data + except Exception as e: + logger.warning("Failed to fetch real historical data, using synthetic", error=str(e)) + return synthetic_data + + except Exception as e: + logger.error("Error getting historical traffic data", error=str(e)) + return [] + + async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: + """Get traffic incidents and events (placeholder for future implementation)""" + return [] + + # ================================================================ + # REAL-TIME TRAFFIC METHODS + # ================================================================ + async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]: """Fetch and parse Madrid traffic XML data""" try: @@ -82,12 +258,10 @@ class MadridOpenDataClient(BaseAPIClient): logger.debug("No XML content received", endpoint=endpoint) return None - # Log XML structure for debugging logger.debug("Madrid XML content preview", length=len(xml_content), first_500=xml_content[:500] if len(xml_content) > 500 else xml_content) - # Parse Madrid traffic XML with the correct structure traffic_points = self._parse_madrid_traffic_xml(xml_content) if traffic_points: @@ -102,22 +276,15 @@ class MadridOpenDataClient(BaseAPIClient): return None def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]: - """Parse Madrid traffic XML with correct structure (...)""" + """Parse Madrid traffic XML with correct structure""" traffic_points = [] try: - # Clean the XML to handle undefined entities and encoding issues cleaned_xml = self._clean_madrid_xml(xml_content) - - # Parse XML root = ET.fromstring(cleaned_xml) - # Log XML structure - logger.debug("Madrid XML structure", - root_tag=root.tag, - children_count=len(list(root))) + logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root))) - # Madrid uses root with children if root.tag == 'pms': pm_elements = root.findall('pm') logger.debug("Found PM elements", count=len(pm_elements)) @@ -126,10 +293,7 @@ class MadridOpenDataClient(BaseAPIClient): try: traffic_point = self._extract_madrid_pm_element(pm) - # Validate essential data (coordinates and ID) - if (traffic_point.get('latitude') and - traffic_point.get('longitude') and - traffic_point.get('idelem')): + if self._is_valid_traffic_point(traffic_point): traffic_points.append(traffic_point) # Log first few points for debugging @@ -151,60 +315,298 @@ class MadridOpenDataClient(BaseAPIClient): except ET.ParseError as e: logger.warning("Failed to parse Madrid XML", error=str(e)) - # Try regex extraction as fallback return self._extract_traffic_data_regex(xml_content) except Exception as e: logger.error("Error in Madrid traffic XML parsing", error=str(e)) return [] - def _clean_madrid_xml(self, xml_content: str) -> str: - """Clean Madrid XML to handle undefined entities and encoding issues""" + # ================================================================ + # HISTORICAL TRAFFIC METHODS + # ================================================================ + + async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Fetch real historical traffic data from Madrid ZIP files""" try: - # Remove BOM if present - xml_content = xml_content.lstrip('\ufeff') + historical_data = [] + current_date = start_date.replace(day=1) - # Remove or replace undefined entities that cause parsing errors - # Common undefined entities in Madrid data - xml_content = xml_content.replace(' ', ' ') - xml_content = xml_content.replace('©', '©') - xml_content = xml_content.replace('®', '®') - xml_content = xml_content.replace('™', '™') + while current_date <= end_date: + try: + month_code = self._calculate_madrid_month_code(current_date.year, current_date.month) + + if month_code: + zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip" + logger.debug("Trying ZIP URL", url=zip_url, + year=current_date.year, month=current_date.month, code=month_code) + + zip_data = await self._fetch_historical_zip(zip_url) + if zip_data: + month_data = await self._parse_historical_zip(zip_data, latitude, longitude, start_date, end_date) + historical_data.extend(month_data) + logger.info("Fetched historical data for month", + year=current_date.year, month=current_date.month, records=len(month_data)) + else: + logger.debug("No ZIP data found for month", + year=current_date.year, month=current_date.month) + else: + logger.debug("Could not calculate month code", + year=current_date.year, month=current_date.month) + + current_date = self._get_next_month(current_date) + + except Exception as e: + logger.warning("Error fetching data for month", + year=current_date.year, month=current_date.month, error=str(e)) + current_date = self._get_next_month(current_date) - # Fix unescaped ampersands (but not already escaped ones) - xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content) - - # Remove invalid control characters - xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content) - - # Handle Spanish characters that might be causing issues - spanish_chars = { - 'ñ': 'n', 'Ñ': 'N', - 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', - 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', - 'ü': 'u', 'Ü': 'U' - } - - for spanish_char, replacement in spanish_chars.items(): - xml_content = xml_content.replace(spanish_char, replacement) - - return xml_content + return historical_data except Exception as e: - logger.warning("Error cleaning Madrid XML", error=str(e)) - return xml_content + logger.error("Error fetching real historical traffic data", error=str(e)) + return [] + + async def _parse_historical_zip(self, zip_content: bytes, latitude: float, longitude: float, + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Parse Madrid historical traffic ZIP file""" + try: + import zipfile + from io import BytesIO + + historical_records = [] + + with zipfile.ZipFile(BytesIO(zip_content), 'r') as zip_file: + logger.debug("ZIP file contents", files=zip_file.namelist()) + + csv_files = [f for f in zip_file.namelist() + if f.endswith('.csv') and not f.startswith('__MACOSX')] + + if not csv_files: + logger.warning("No CSV files found in ZIP") + return [] + + for csv_filename in csv_files: + logger.debug("Processing CSV file", filename=csv_filename) + + try: + csv_content = self._extract_csv_from_zip(zip_file, csv_filename) + if csv_content: + file_records = await self._parse_csv_content( + csv_content, latitude, longitude, start_date, end_date + ) + historical_records.extend(file_records) + + logger.debug("Processed CSV file", + filename=csv_filename, records=len(file_records)) + + except Exception as e: + logger.warning("Error processing CSV file", + filename=csv_filename, error=str(e)) + continue + + return historical_records + + except Exception as e: + logger.error("Error parsing historical ZIP", error=str(e)) + return [] + + # ================================================================ + # DATA PARSING AND CONVERSION METHODS + # ================================================================ + + async def _parse_csv_content(self, csv_content: str, latitude: float, longitude: float, + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Parse CSV content from Madrid historical traffic data""" + try: + import csv + from io import StringIO + + csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') + + if not self._validate_csv_structure(csv_reader.fieldnames): + return [] + + logger.debug("Madrid CSV structure detected", fields=csv_reader.fieldnames) + + # Get nearest measurement points + measurement_points = await self._get_measurement_points_near_location(latitude, longitude) + target_point_ids = [str(point.id) for point in measurement_points[:10]] + + logger.debug("Target measurement points", ids=target_point_ids[:3]) + + # Process CSV rows + historical_records = [] + processed_count = 0 + + for row_num, row in enumerate(csv_reader): + if processed_count >= MAX_CSV_PROCESSING_ROWS: + logger.info("Reached processing limit", limit=MAX_CSV_PROCESSING_ROWS) + break + + try: + traffic_record = self._parse_csv_row(row, target_point_ids, start_date, end_date) + if traffic_record: + historical_records.append(traffic_record.to_dict()) + processed_count += 1 + + if processed_count % 1000 == 0: + logger.debug("Processing progress", processed=processed_count) + + except Exception as e: + if row_num % 5000 == 0: + logger.debug("Error parsing CSV row", row_num=row_num, error=str(e)) + continue + + logger.info("Successfully parsed Madrid CSV", + total_rows=row_num + 1, processed=processed_count, records=len(historical_records)) + + # Enrich with location data + if historical_records and measurement_points: + historical_records = self._enrich_with_location_data(historical_records, measurement_points) + + return historical_records + + except Exception as e: + logger.error("Error parsing Madrid CSV content", error=str(e)) + return [] + + def _parse_csv_row(self, row: Dict[str, str], target_point_ids: List[str], + start_date: datetime, end_date: datetime) -> Optional[TrafficRecord]: + """Parse a single CSV row into a TrafficRecord""" + try: + # Extract and validate point ID + point_id = str(row.get('id', '')).strip() + if not point_id or (target_point_ids and point_id not in target_point_ids): + return None + + # Parse date + record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"')) + if not record_date or not (start_date <= record_date <= end_date): + return None + + # Parse traffic data + intensidad = self._safe_int(row.get('intensidad', '0')) + ocupacion = self._safe_int(row.get('ocupacion', '0')) + carga = self._safe_int(row.get('carga', '0')) + vmed = self._safe_int(row.get('vmed', '0')) + tipo_elem = row.get('tipo_elem', '').strip().strip('"') + error = row.get('error', 'N').strip().strip('"') + + # Skip erroneous records + if error == 'S': + return None + + # Calculate derived metrics + avg_speed = self._calculate_average_speed(vmed, carga, ocupacion) + congestion_level = self._determine_congestion_level(carga, avg_speed) + pedestrian_count = self._calculate_pedestrian_count(tipo_elem, record_date.hour) + + return TrafficRecord( + date=record_date, + traffic_volume=intensidad, + occupation_percentage=ocupacion, + load_percentage=carga, + average_speed=avg_speed, + congestion_level=congestion_level, + pedestrian_count=pedestrian_count, + measurement_point_id=point_id, + measurement_point_name=f"Madrid Point {point_id}", + road_type=tipo_elem, + source=DataSource.MADRID_HISTORICAL.value, + error_status=error, + intensidad_raw=intensidad, + ocupacion_raw=ocupacion, + carga_raw=carga, + vmed_raw=vmed + ) + + except Exception as e: + logger.debug("Error parsing CSV row", error=str(e)) + return None + + # ================================================================ + # MEASUREMENT POINTS METHODS + # ================================================================ + + async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[MeasurementPoint]: + """Get measurement points near the specified location""" + try: + points_csv = await self._fetch_measurement_points_csv(self.measurement_points_url) + + if points_csv: + return await self._parse_measurement_points_csv(points_csv, latitude, longitude) + else: + logger.info("Using fallback measurement points") + return self._get_fallback_measurement_points(latitude, longitude) + + except Exception as e: + logger.warning("Error getting measurement points", error=str(e)) + return self._get_fallback_measurement_points(latitude, longitude) + + async def _parse_measurement_points_csv(self, csv_content: str, query_lat: float, query_lon: float) -> List[MeasurementPoint]: + """Parse measurement points CSV and find nearest points""" + try: + import csv + from io import StringIO + + points_with_distance = [] + csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') + + for row in csv_reader: + try: + point_id = row.get('id', '').strip() + latitud = row.get('latitud', '').strip() + longitud = row.get('longitud', '').strip() + nombre = row.get('nombre', '').strip().strip('"') + tipo_elem = row.get('tipo_elem', '').strip().strip('"') + + if not (point_id and latitud and longitud): + continue + + lat, lon = float(latitud), float(longitud) + distance = self._calculate_distance(query_lat, query_lon, lat, lon) + + point = MeasurementPoint( + id=point_id, + latitude=lat, + longitude=lon, + distance=distance, + name=nombre or f'Point {point_id}', + type=tipo_elem + ) + + points_with_distance.append(point) + + except Exception as e: + logger.debug("Error parsing measurement point row", error=str(e)) + continue + + # Sort by distance and return closest points + points_with_distance.sort(key=lambda x: x.distance) + closest_points = points_with_distance[:MEASUREMENT_POINTS_LIMIT] + + logger.info("Found measurement points", + total=len(points_with_distance), closest=len(closest_points)) + + return closest_points + + except Exception as e: + logger.error("Error parsing measurement points CSV", error=str(e)) + return [] + + # ================================================================ + # COORDINATE CONVERSION METHODS + # ================================================================ def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]: - """Extract traffic data from Madrid element with proper coordinate conversion""" + """Extract traffic data from Madrid element with coordinate conversion""" try: - # Based on the actual Madrid XML structure shown in logs point_data = {} - utm_x = None - utm_y = None + utm_x = utm_y = None # Extract all child elements for child in pm_element: - tag = child.tag - text = child.text.strip() if child.text else '' + tag, text = child.tag, child.text.strip() if child.text else '' if tag == 'idelem': point_data['idelem'] = text @@ -219,125 +621,224 @@ class MadridOpenDataClient(BaseAPIClient): elif tag == 'nivelServicio': point_data['nivelServicio'] = self._safe_int(text) elif tag == 'st_x': - # Store UTM X coordinate for later conversion utm_x = text - point_data['utm_x'] = text # Keep original for debugging + point_data['utm_x'] = text elif tag == 'st_y': - # Store UTM Y coordinate for later conversion utm_y = text - point_data['utm_y'] = text # Keep original for debugging + point_data['utm_y'] = text elif tag == 'error': point_data['error'] = text - elif tag == 'subarea': - point_data['subarea'] = text - elif tag == 'accesoAsociado': - point_data['accesoAsociado'] = text - elif tag == 'intensidadSat': - point_data['intensidadSat'] = self._safe_int(text) + elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']: + point_data[tag] = text - # Convert UTM coordinates to lat/lon if both are available + # Convert coordinates if utm_x and utm_y: - latitude, longitude = self._convert_utm_coordinates_accurate(utm_x, utm_y) + latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y) - if latitude is not None and longitude is not None: - # Validate that coordinates are actually in Madrid area - if self._validate_madrid_coordinates(latitude, longitude): - point_data['latitude'] = latitude - point_data['longitude'] = longitude - - # Log first few successful conversions for verification - if len(getattr(self, '_conversion_log_count', [])) < 3: - if not hasattr(self, '_conversion_log_count'): - self._conversion_log_count = [] - self._conversion_log_count.append(1) - - logger.debug("Successful UTM conversion", - idelem=point_data.get('idelem'), - utm_x=utm_x, - utm_y=utm_y, - latitude=latitude, - longitude=longitude, - descripcion=point_data.get('descripcion')) - else: - # Log invalid coordinates for debugging - logger.debug("Invalid Madrid coordinates after conversion", - idelem=point_data.get('idelem'), - utm_x=utm_x, - utm_y=utm_y, - converted_lat=latitude, - converted_lon=longitude, - descripcion=point_data.get('descripcion')) - # Don't include this point - return empty dict - return {} + if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude): + point_data.update({'latitude': latitude, 'longitude': longitude}) + + # Log successful conversions (limited) + self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude) + return point_data else: - # Conversion failed - logger.debug("UTM conversion failed", - idelem=point_data.get('idelem'), - utm_x=utm_x, - utm_y=utm_y) + logger.debug("Invalid coordinates after conversion", + idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y) return {} else: - # Missing coordinates - logger.debug("Missing UTM coordinates", - idelem=point_data.get('idelem'), - has_utm_x=utm_x is not None, - has_utm_y=utm_y is not None) + logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem')) return {} - return point_data - except Exception as e: logger.debug("Error extracting Madrid PM element", error=str(e)) return {} - - def _convert_utm_coordinates_accurate(self, utm_x_str: str, utm_y_str: str) -> tuple[Optional[float], Optional[float]]: - """Convert UTM coordinates to lat/lon using accurate pyproj library""" + def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]: + """Convert UTM coordinates to lat/lon using pyproj""" try: utm_x = float(utm_x_str.replace(',', '.')) utm_y = float(utm_y_str.replace(',', '.')) - # Define UTM Zone 30N projection (EPSG:25830) - utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', preserve_units=False) - - # Convert to latitude/longitude - longitude, latitude = utm_proj(utm_x, utm_y, inverse=True) - + longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True) return round(latitude, 6), round(longitude, 6) except (ValueError, TypeError, Exception): return None, None - - def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool: - """Validate that converted coordinates are actually in Madrid area""" - # Madrid bounds (expanded slightly to include metro area) - madrid_lat_min, madrid_lat_max = 40.31, 40.56 - madrid_lon_min, madrid_lon_max = -3.89, -3.51 + + # ================================================================ + # UTILITY AND HELPER METHODS + # ================================================================ + + def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool: + """Validate date range for historical data requests""" + days_diff = (end_date - start_date).days - return (madrid_lat_min <= latitude <= madrid_lat_max and - madrid_lon_min <= longitude <= madrid_lon_max) + # Allow same-day ranges (days_diff = 0) and ranges within the same day + if days_diff < 0: + logger.warning("End date before start date", start=start_date, end=end_date) + return False + + if days_diff > MAX_HISTORICAL_DAYS: + logger.warning("Date range too large for historical traffic data", days=days_diff) + return False + + return True - def _safe_int(self, value_str: str) -> int: - """Safely convert string to int""" + def _calculate_madrid_month_code(self, year: int, month: int) -> Optional[int]: + """Calculate Madrid's month code for ZIP files (June 2025 = 145)""" try: - return int(float(value_str.replace(',', '.'))) - except (ValueError, TypeError): - return 0 + reference_year, reference_month, reference_code = 2025, 6, 145 + months_diff = (year - reference_year) * 12 + (month - reference_month) + estimated_code = reference_code + months_diff + + if 100 <= estimated_code <= 300: + return estimated_code + else: + logger.warning("Month code out of range", year=year, month=month, code=estimated_code) + return None + + except Exception as e: + logger.error("Error calculating month code", year=year, month=month, error=str(e)) + return None - def _safe_float(self, value_str: str) -> float: - """Safely convert string to float""" + def _calculate_average_speed(self, vmed: int, carga: int, ocupacion: int) -> int: + """Calculate average speed based on available data""" + if vmed > 0: # M30 points have speed data + return vmed + else: # Urban points - estimate from carga and ocupacion + if carga >= 80: + speed = 15 + elif carga >= 50: + speed = 25 + elif carga >= 20: + speed = 35 + else: + speed = 45 + + # Adjust based on occupation + if ocupacion >= 30: + speed = max(10, speed - 10) + elif ocupacion <= 5: + speed = min(50, speed + 5) + + return speed + + def _determine_congestion_level(self, carga: int, avg_speed: int) -> str: + """Determine congestion level from carga and speed""" + if carga >= 90 and avg_speed <= 10: + return CongestionLevel.BLOCKED.value + elif carga >= 75: + return CongestionLevel.HIGH.value + elif carga >= 40: + return CongestionLevel.MEDIUM.value + else: + return CongestionLevel.LOW.value + + def _calculate_pedestrian_count(self, tipo_elem: str, hour: int) -> int: + """Calculate pedestrian estimate based on area type and time""" + if tipo_elem == 'URB': + base = 200 + if 12 <= hour <= 14: # Lunch time + multiplier = 2.0 + elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours + multiplier = 1.5 + else: + multiplier = 1.0 + else: # M30, C30 + base = 50 + multiplier = 0.5 + + return int(base * multiplier) + + def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]: + """Parse Madrid date format""" + if not fecha_str: + return None + try: - return float(value_str.replace(',', '.')) - except (ValueError, TypeError): - return 0.0 + return datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') + except ValueError: + try: + return datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S') + except ValueError: + return None + + def _validate_csv_structure(self, fieldnames: Optional[List[str]]) -> bool: + """Validate CSV has expected structure""" + if not fieldnames: + logger.warning("No CSV fieldnames found") + return False + + expected_fields = ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga'] + missing_fields = [field for field in expected_fields if field not in fieldnames] + + if missing_fields: + logger.warning("Missing expected fields in CSV", missing=missing_fields, available=fieldnames) + + return True # Continue processing even with some missing fields + + def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool: + """Check if traffic point has valid essential data""" + return (traffic_point.get('latitude') and + traffic_point.get('longitude') and + traffic_point.get('idelem')) + + def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool: + """Validate coordinates are in Madrid area""" + return (MADRID_BOUNDS['lat_min'] <= latitude <= MADRID_BOUNDS['lat_max'] and + MADRID_BOUNDS['lon_min'] <= longitude <= MADRID_BOUNDS['lon_max']) + + def _get_next_month(self, current_date: datetime) -> datetime: + """Get next month date""" + if current_date.month == 12: + return current_date.replace(year=current_date.year + 1, month=1) + else: + return current_date.replace(month=current_date.month + 1) + + def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str, + latitude: float, longitude: float) -> None: + """Log coordinate conversion (limited to first few for debugging)""" + if len(self._conversion_log_count) < 3: + self._conversion_log_count.append(1) + logger.debug("Successful UTM conversion", + idelem=point_data.get('idelem'), + utm_x=utm_x, utm_y=utm_y, + latitude=latitude, longitude=longitude, + descripcion=point_data.get('descripcion')) + + def _enrich_with_location_data(self, records: List[Dict[str, Any]], + measurement_points: List[MeasurementPoint]) -> List[Dict[str, Any]]: + """Enrich traffic records with location data from measurement points""" + try: + points_lookup = {point.id: point for point in measurement_points} + + for record in records: + point_id = record.get('measurement_point_id') + if point_id in points_lookup: + point = points_lookup[point_id] + record.update({ + 'measurement_point_name': point.name, + 'measurement_point_latitude': point.latitude, + 'measurement_point_longitude': point.longitude, + 'distance_to_query': point.distance + }) + + return records + + except Exception as e: + logger.warning("Error enriching with location data", error=str(e)) + return records + + # ================================================================ + # HTTP CLIENT METHODS + # ================================================================ async def _fetch_xml_content_robust(self, url: str) -> Optional[str]: """Fetch XML content with robust headers for Madrid endpoints""" try: import httpx - # Headers optimized for Madrid Open Data headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/xml,text/xml,*/*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', @@ -345,12 +846,7 @@ class MadridOpenDataClient(BaseAPIClient): 'Referer': 'https://datos.madrid.es/' } - async with httpx.AsyncClient( - timeout=30.0, - follow_redirects=True, - headers=headers - ) as client: - + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, headers=headers) as client: logger.debug("Fetching XML from Madrid endpoint", url=url) response = await client.get(url) @@ -360,20 +856,9 @@ class MadridOpenDataClient(BaseAPIClient): content_length=len(response.content)) if response.status_code == 200: - try: - content = response.text - if content and len(content) > 100: - return content - except UnicodeDecodeError: - # Try manual encoding for Spanish content - for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: - try: - content = response.content.decode(encoding) - if content and len(content) > 100: - logger.debug("Successfully decoded with encoding", encoding=encoding) - return content - except UnicodeDecodeError: - continue + content = self._decode_response_content(response) + if content and len(content) > 100: + return content return None @@ -381,50 +866,158 @@ class MadridOpenDataClient(BaseAPIClient): logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e)) return None + async def _fetch_historical_zip(self, url: str) -> Optional[bytes]: + """Fetch historical ZIP data from Madrid Open Data""" + try: + import httpx + + headers = { + 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', + 'Accept': 'application/zip,application/octet-stream,*/*', + 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', + } + + async with httpx.AsyncClient(timeout=120.0, headers=headers) as client: + logger.debug("Fetching historical ZIP", url=url) + response = await client.get(url) + + if response.status_code == 200: + content = response.content + if content and len(content) > 1000: + logger.debug("Successfully fetched ZIP", url=url, size=len(content)) + return content + else: + logger.debug("ZIP file too small", url=url, size=len(content) if content else 0) + else: + logger.debug("ZIP not found", url=url, status=response.status_code) + + except Exception as e: + logger.debug("Error fetching ZIP", url=url, error=str(e)) + + return None + + async def _fetch_measurement_points_csv(self, url: str) -> Optional[str]: + """Fetch the measurement points CSV file""" + try: + import httpx + + headers = { + 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', + 'Accept': 'text/csv,application/csv,text/plain,*/*', + 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', + } + + async with httpx.AsyncClient(timeout=30.0, headers=headers) as client: + logger.debug("Fetching measurement points CSV", url=url) + response = await client.get(url) + + if response.status_code == 200: + content = response.text + if content and len(content) > 1000: + logger.debug("Successfully fetched measurement points CSV", + url=url, size=len(content)) + return content + else: + logger.debug("Measurement points CSV too small", size=len(content)) + else: + logger.debug("Measurement points CSV not found", + url=url, status=response.status_code) + + except Exception as e: + logger.debug("Error fetching measurement points CSV", url=url, error=str(e)) + + return None + + def _decode_response_content(self, response) -> Optional[str]: + """Decode response content with multiple encoding attempts""" + try: + return response.text + except UnicodeDecodeError: + # Try manual encoding for Spanish content + for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: + try: + content = response.content.decode(encoding) + if content and len(content) > 100: + logger.debug("Successfully decoded with encoding", encoding=encoding) + return content + except UnicodeDecodeError: + continue + return None + + def _extract_csv_from_zip(self, zip_file, csv_filename: str) -> Optional[str]: + """Extract and decode CSV content from ZIP file""" + try: + csv_bytes = zip_file.read(csv_filename) + + # Try different encodings for Spanish content + for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: + try: + csv_content = csv_bytes.decode(encoding) + logger.debug("Successfully decoded CSV", filename=csv_filename, encoding=encoding) + return csv_content + except UnicodeDecodeError: + continue + + logger.warning("Could not decode CSV file", filename=csv_filename) + return None + + except Exception as e: + logger.warning("Error extracting CSV from ZIP", filename=csv_filename, error=str(e)) + return None + + # ================================================================ + # XML PROCESSING METHODS + # ================================================================ + + def _clean_madrid_xml(self, xml_content: str) -> str: + """Clean Madrid XML to handle undefined entities and encoding issues""" + try: + # Remove BOM if present + xml_content = xml_content.lstrip('\ufeff') + + # Replace undefined entities + entity_replacements = { + ' ': ' ', '©': '©', '®': '®', '™': '™' + } + + for entity, replacement in entity_replacements.items(): + xml_content = xml_content.replace(entity, replacement) + + # Fix unescaped ampersands + xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content) + + # Remove invalid control characters + xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content) + + # Handle Spanish characters + spanish_chars = { + 'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', + 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U' + } + + for spanish_char, replacement in spanish_chars.items(): + xml_content = xml_content.replace(spanish_char, replacement) + + return xml_content + + except Exception as e: + logger.warning("Error cleaning Madrid XML", error=str(e)) + return xml_content + def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]: """Extract traffic data using regex when XML parsing fails""" traffic_points = [] try: - # Pattern to match Madrid PM elements pm_pattern = r'(.*?)' pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL) for pm_content in pm_matches: try: - # Extract individual fields - idelem_match = re.search(r'(.*?)', pm_content) - intensidad_match = re.search(r'(.*?)', pm_content) - st_x_match = re.search(r'(.*?)', pm_content) - st_y_match = re.search(r'(.*?)', pm_content) - descripcion_match = re.search(r'(.*?)', pm_content) - - if idelem_match and st_x_match and st_y_match: - idelem = idelem_match.group(1) - st_x = st_x_match.group(1) - st_y = st_y_match.group(1) - intensidad = intensidad_match.group(1) if intensidad_match else '0' - descripcion = descripcion_match.group(1) if descripcion_match else f'Point {idelem}' + extracted_data = self._extract_pm_data_regex(pm_content) + if extracted_data and self._is_valid_traffic_point(extracted_data): + traffic_points.append(extracted_data) - # Convert coordinates - longitude = self._convert_utm_to_lon(st_x) - latitude = self._convert_utm_to_lat(st_y) - - if latitude and longitude: - traffic_point = { - 'idelem': idelem, - 'descripcion': descripcion, - 'intensidad': self._safe_int(intensidad), - 'latitude': latitude, - 'longitude': longitude, - 'ocupacion': 0, - 'carga': 0, - 'nivelServicio': 0, - 'error': 'N' - } - - traffic_points.append(traffic_point) - except Exception as e: logger.debug("Error parsing regex PM match", error=str(e)) continue @@ -436,23 +1029,46 @@ class MadridOpenDataClient(BaseAPIClient): logger.error("Error in regex extraction", error=str(e)) return [] - def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float: - """Get distance to closest traffic point for debugging""" - if not traffic_data: - return float('inf') + def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]: + """Extract individual PM data using regex""" + patterns = { + 'idelem': r'(.*?)', + 'intensidad': r'(.*?)', + 'st_x': r'(.*?)', + 'st_y': r'(.*?)', + 'descripcion': r'(.*?)' + } - min_distance = float('inf') - for point in traffic_data: - if point.get('latitude') and point.get('longitude'): - distance = self._calculate_distance( - latitude, longitude, - point['latitude'], point['longitude'] - ) - min_distance = min(min_distance, distance) + extracted = {} + for field, pattern in patterns.items(): + match = re.search(pattern, pm_content) + extracted[field] = match.group(1) if match else '' - return min_distance + if extracted['idelem'] and extracted['st_x'] and extracted['st_y']: + # Convert coordinates + latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y']) + + if latitude and longitude: + return { + 'idelem': extracted['idelem'], + 'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}", + 'intensidad': self._safe_int(extracted['intensidad']), + 'latitude': latitude, + 'longitude': longitude, + 'ocupacion': 0, + 'carga': 0, + 'nivelServicio': 0, + 'error': 'N' + } + + return {} - def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]: + # ================================================================ + # TRAFFIC ANALYSIS METHODS + # ================================================================ + + def _find_nearest_traffic_point(self, latitude: float, longitude: float, + traffic_data: List[Dict]) -> Optional[Dict]: """Find the nearest traffic measurement point to given coordinates""" if not traffic_data: return None @@ -480,60 +1096,50 @@ class MadridOpenDataClient(BaseAPIClient): return nearest_point logger.debug("No nearby Madrid traffic points found", - min_distance=min_distance, - total_points=len(traffic_data)) + min_distance=min_distance, total_points=len(traffic_data)) return None - def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: - """Calculate distance between two coordinates in km using Haversine formula""" - R = 6371 # Earth's radius in km + def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float: + """Get distance to closest traffic point for debugging""" + if not traffic_data: + return float('inf') - dlat = math.radians(lat2 - lat1) - dlon = math.radians(lon2 - lon1) + min_distance = float('inf') + for point in traffic_data: + if point.get('latitude') and point.get('longitude'): + distance = self._calculate_distance( + latitude, longitude, + point['latitude'], point['longitude'] + ) + min_distance = min(min_distance, distance) - a = (math.sin(dlat/2) * math.sin(dlat/2) + - math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * - math.sin(dlon/2) * math.sin(dlon/2)) - - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - distance = R * c - - return distance + return min_distance def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]: """Parse Madrid traffic measurement into standardized format""" try: - # Madrid traffic service levels: 0=fluid, 1=dense, 2=congested, 3=cut - service_level_map = { - 0: "low", - 1: "medium", - 2: "high", - 3: "blocked" + service_level = traffic_point.get('nivelServicio', 0) + congestion_mapping = { + TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value, + TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value, + TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value, + TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value } - service_level = traffic_point.get('nivelServicio', 0) + # Speed estimation based on service level + speed_mapping = { + TrafficServiceLevel.FLUID.value: 45, + TrafficServiceLevel.DENSE.value: 25, + TrafficServiceLevel.CONGESTED.value: 15, + TrafficServiceLevel.BLOCKED.value: 5 + } - # Estimate speed based on service level and road type - if service_level == 0: # Fluid - average_speed = 45 - elif service_level == 1: # Dense - average_speed = 25 - elif service_level == 2: # Congested - average_speed = 15 - else: # Cut/Blocked - average_speed = 5 + congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value) + average_speed = speed_mapping.get(service_level, 25) - congestion_level = service_level_map.get(service_level, "medium") - - # Calculate pedestrian estimate based on location + # Calculate pedestrian estimate hour = datetime.now().hour - if 13 <= hour <= 15: # Lunch time - pedestrian_multiplier = 2.5 - elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours - pedestrian_multiplier = 2.0 - else: - pedestrian_multiplier = 1.0 - + pedestrian_multiplier = self._get_pedestrian_multiplier(hour) pedestrian_count = int(100 * pedestrian_multiplier) return { @@ -547,28 +1153,25 @@ class MadridOpenDataClient(BaseAPIClient): "measurement_point_id": traffic_point.get('idelem'), "measurement_point_name": traffic_point.get('descripcion'), "road_type": "URB", - "source": "madrid_opendata" + "source": DataSource.MADRID_REALTIME.value } except Exception as e: logger.error("Error parsing traffic measurement", error=str(e)) return self._get_default_traffic_data() - def _get_default_traffic_data(self) -> Dict[str, Any]: - """Get default traffic data when parsing fails""" - return { - "date": datetime.now(), - "traffic_volume": 100, - "pedestrian_count": 150, - "congestion_level": "medium", - "average_speed": 25, - "occupation_percentage": 30, - "load_percentage": 40, - "measurement_point_id": "unknown", - "measurement_point_name": "Unknown location", - "road_type": "URB", - "source": "synthetic" - } + def _get_pedestrian_multiplier(self, hour: int) -> float: + """Get pedestrian multiplier based on time of day""" + if 13 <= hour <= 15: # Lunch time + return 2.5 + elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours + return 2.0 + else: + return 1.0 + + # ================================================================ + # SYNTHETIC DATA GENERATION METHODS + # ================================================================ async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]: """Generate realistic Madrid traffic data as fallback""" @@ -576,338 +1179,25 @@ class MadridOpenDataClient(BaseAPIClient): hour = now.hour is_weekend = now.weekday() >= 5 - base_traffic = 100 - - if not is_weekend: - if 7 <= hour <= 9: - traffic_multiplier = 2.2 - congestion = "high" - avg_speed = 15 - elif 18 <= hour <= 20: - traffic_multiplier = 2.5 - congestion = "high" - avg_speed = 12 - elif 12 <= hour <= 14: - traffic_multiplier = 1.6 - congestion = "medium" - avg_speed = 25 - else: - traffic_multiplier = 1.0 - congestion = "low" - avg_speed = 40 - else: - if 11 <= hour <= 14: - traffic_multiplier = 1.4 - congestion = "medium" - avg_speed = 30 - else: - traffic_multiplier = 0.8 - congestion = "low" - avg_speed = 45 - - traffic_volume = int(base_traffic * traffic_multiplier) - - # Pedestrian calculation - pedestrian_base = 150 - if 13 <= hour <= 15: - pedestrian_count = int(pedestrian_base * 2.5) - elif 8 <= hour <= 9 or 18 <= hour <= 20: - pedestrian_count = int(pedestrian_base * 2.0) - else: - pedestrian_count = int(pedestrian_base * 1.0) + traffic_params = self._calculate_traffic_parameters(hour, is_weekend) return { "date": now, - "traffic_volume": traffic_volume, - "pedestrian_count": pedestrian_count, - "congestion_level": congestion, - "average_speed": max(10, avg_speed), - "occupation_percentage": min(100, traffic_volume // 2), - "load_percentage": min(100, traffic_volume // 3), + "traffic_volume": traffic_params['volume'], + "pedestrian_count": traffic_params['pedestrians'], + "congestion_level": traffic_params['congestion'], + "average_speed": traffic_params['speed'], + "occupation_percentage": min(100, traffic_params['volume'] // 2), + "load_percentage": min(100, traffic_params['volume'] // 3), "measurement_point_id": "madrid_synthetic", "measurement_point_name": "Madrid Centro (Synthetic)", "road_type": "URB", - "source": "synthetic" + "source": DataSource.SYNTHETIC.value } - async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Get historical traffic data from Madrid Open Data - - Args: - latitude: Location latitude - longitude: Location longitude - start_date: Start date for historical data - end_date: End date for historical data - - Returns: - List of historical traffic data dictionaries - """ - try: - logger.debug("Fetching Madrid historical traffic data", - lat=latitude, lon=longitude, - start=start_date, end=end_date) - - historical_data = [] - - # Generate historical data using synthetic generation for periods before API availability - # or when real data is not available - if (end_date - start_date).days <= 90: # Reasonable range for synthetic data - historical_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date) - logger.info("Generated synthetic historical traffic data", - records=len(historical_data)) - else: - logger.warning("Date range too large for historical traffic data", - days=(end_date - start_date).days) - return [] - - # Try to fetch real data if API key is available and for recent dates - if hasattr(self, 'api_key') and self.api_key: - try: - real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date) - if real_data: - # Merge real data with synthetic data or replace synthetic data - historical_data = real_data - logger.info("Fetched real historical traffic data", - records=len(real_data)) - except Exception as e: - logger.warning("Failed to fetch real historical data, using synthetic", error=str(e)) - - return historical_data - - except Exception as e: - logger.error("Error getting historical traffic data", error=str(e)) - return [] - - async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Fetch real historical traffic data from Madrid Open Data portal - - Madrid provides historical CSV files by month at: - https://datos.madrid.es/egob/catalogo/[ID]-[YEAR]-[MONTH]-trafico-historico.csv - """ - try: - historical_data = [] - current_date = start_date.replace(day=1) # Start from beginning of month - - while current_date <= end_date: - try: - # Madrid historical traffic CSV URL pattern - year = current_date.year - month = current_date.month - - # Try different URL patterns based on Madrid Open Data structure - historical_urls = [ - f"https://datos.madrid.es/egob/catalogo/300217-{year}-{month:02d}-trafico-historico.csv", - f"https://datos.madrid.es/egob/catalogo/trafico-historico-{year}-{month:02d}.csv", - f"https://datos.madrid.es/egob/catalogo/{year}{month:02d}-trafico-historico.csv" - ] - - for url in historical_urls: - csv_data = await self._fetch_historical_csv(url) - if csv_data: - # Parse CSV and filter by location - month_data = await self._parse_historical_csv(csv_data, latitude, longitude, start_date, end_date) - historical_data.extend(month_data) - logger.debug("Fetched historical data for month", - year=year, month=month, records=len(month_data)) - break - - # Move to next month - if current_date.month == 12: - current_date = current_date.replace(year=current_date.year + 1, month=1) - else: - current_date = current_date.replace(month=current_date.month + 1) - - except Exception as e: - logger.warning("Error fetching data for month", - year=current_date.year, month=current_date.month, error=str(e)) - # Move to next month even on error - if current_date.month == 12: - current_date = current_date.replace(year=current_date.year + 1, month=1) - else: - current_date = current_date.replace(month=current_date.month + 1) - - return historical_data - - except Exception as e: - logger.error("Error fetching real historical traffic data", error=str(e)) - return [] - - async def _fetch_historical_csv(self, url: str) -> Optional[str]: - """Fetch historical CSV data from Madrid Open Data""" - try: - import httpx - - headers = { - 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', - 'Accept': 'text/csv,application/csv,text/plain,*/*', - 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', - } - - async with httpx.AsyncClient(timeout=60.0, headers=headers) as client: - logger.debug("Fetching historical CSV", url=url) - response = await client.get(url) - - if response.status_code == 200: - content = response.text - if content and len(content) > 100: # Ensure we got actual data - logger.debug("Successfully fetched CSV", - url=url, size=len(content)) - return content - else: - logger.debug("CSV not found", url=url, status=response.status_code) - - except Exception as e: - logger.debug("Error fetching CSV", url=url, error=str(e)) - - return None - - async def _parse_historical_csv(self, csv_content: str, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Parse Madrid historical traffic CSV and filter by location and date range""" - try: - import csv - from io import StringIO - - historical_records = [] - csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') - - # Get the nearest measurement points to our coordinates - measurement_points = await self._get_measurement_points_near_location(latitude, longitude) - target_point_ids = [point['id'] for point in measurement_points[:3]] # Use 3 nearest points - - for row in csv_reader: - try: - # Parse Madrid CSV format - # Expected columns: fecha, hora, idelem, intensidad, ocupacion, carga, nivelServicio, etc. - - # Extract date and time - if 'fecha' in row and 'hora' in row: - date_str = row.get('fecha', '').strip() - time_str = row.get('hora', '').strip() - - # Parse Madrid date format (usually DD/MM/YYYY) - if date_str and time_str: - try: - # Try different date formats - for date_format in ['%d/%m/%Y', '%Y-%m-%d', '%d-%m-%Y']: - try: - record_date = datetime.strptime(f"{date_str} {time_str}", f"{date_format} %H:%M") - break - except ValueError: - continue - else: - continue # Skip if no date format worked - - # Check if record is in our date range - if not (start_date <= record_date <= end_date): - continue - - except ValueError: - continue - else: - continue - - # Check if this record is from a measurement point near our location - point_id = row.get('idelem', '').strip() - if point_id not in target_point_ids: - continue - - # Parse traffic data - traffic_record = { - "date": record_date, - "traffic_volume": self._safe_int(row.get('intensidad', '0')), - "occupation_percentage": self._safe_int(row.get('ocupacion', '0')), - "load_percentage": self._safe_int(row.get('carga', '0')), - "service_level": self._safe_int(row.get('nivelServicio', '0')), - "measurement_point_id": point_id, - "measurement_point_name": row.get('descripcion', f'Point {point_id}'), - "road_type": row.get('tipo_elem', 'URB'), - "source": "madrid_opendata_historical" - } - - # Calculate derived metrics - service_level = traffic_record['service_level'] - if service_level == 0: # Fluid - congestion_level = "low" - avg_speed = 45 - pedestrian_multiplier = 1.0 - elif service_level == 1: # Dense - congestion_level = "medium" - avg_speed = 25 - pedestrian_multiplier = 1.5 - elif service_level == 2: # Congested - congestion_level = "high" - avg_speed = 15 - pedestrian_multiplier = 2.0 - else: # Cut/Blocked - congestion_level = "blocked" - avg_speed = 5 - pedestrian_multiplier = 0.5 - - traffic_record.update({ - "congestion_level": congestion_level, - "average_speed": avg_speed, - "pedestrian_count": int(100 * pedestrian_multiplier) - }) - - historical_records.append(traffic_record) - - except Exception as e: - logger.debug("Error parsing CSV row", error=str(e)) - continue - - return historical_records - - except Exception as e: - logger.error("Error parsing historical CSV", error=str(e)) - return [] - - async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[Dict[str, Any]]: - """Get measurement points near the specified location""" - try: - # Try to fetch current traffic data to get measurement points - current_traffic = await self._fetch_traffic_xml_data(self.traffic_endpoints[0]) - - if current_traffic: - # Calculate distances and sort by proximity - points_with_distance = [] - for point in current_traffic: - if point.get('latitude') and point.get('longitude'): - distance = self._calculate_distance( - latitude, longitude, - point['latitude'], point['longitude'] - ) - points_with_distance.append({ - 'id': point.get('idelem'), - 'distance': distance, - 'latitude': point['latitude'], - 'longitude': point['longitude'], - 'name': point.get('descripcion', '') - }) - - # Sort by distance and return closest points - points_with_distance.sort(key=lambda x: x['distance']) - return points_with_distance[:5] # Return 5 closest points - - # Fallback: return synthetic point IDs based on Madrid geography - return [ - {'id': 'madrid_centro_01', 'distance': 1.0}, - {'id': 'madrid_centro_02', 'distance': 2.0}, - {'id': 'madrid_centro_03', 'distance': 3.0} - ] - - except Exception as e: - logger.warning("Error getting measurement points", error=str(e)) - return [{'id': 'madrid_default', 'distance': 0.0}] - - async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Generate synthetic historical traffic data for the specified period - - This method creates realistic historical traffic patterns based on: - - Time of day patterns - - Day of week patterns - - Seasonal variations - - Random variations for realism - """ + async def _generate_historical_traffic(self, latitude: float, longitude: float, + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Generate synthetic historical traffic data with realistic patterns""" try: import random from datetime import timedelta @@ -915,95 +1205,54 @@ class MadridOpenDataClient(BaseAPIClient): historical_data = [] current_date = start_date - # Seed random for consistent but varied data + # Seed random for consistent data random.seed(hash(f"{latitude}{longitude}")) - while current_date <= end_date: - # Generate 24 hourly records for each day - for hour in range(24): + while current_date < end_date: + # Calculate how many hours to generate for this day + if current_date.date() == end_date.date(): + # Same day as end_date, only generate up to end_date hour + end_hour = end_date.hour + else: + # Full day + end_hour = 24 + + # Generate hourly records for this day + for hour in range(current_date.hour if current_date == start_date else 0, end_hour): record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0) - # Base traffic calculation - base_traffic = 100 - hour_of_day = record_time.hour - day_of_week = record_time.weekday() # 0=Monday, 6=Sunday - month = record_time.month + # Skip if record time is at or beyond end_date + if record_time >= end_date: + break - # Time of day patterns - if 7 <= hour_of_day <= 9: # Morning rush - traffic_multiplier = 2.2 + random.uniform(-0.3, 0.3) - congestion = "high" - avg_speed = 15 + random.randint(-5, 5) - elif 18 <= hour_of_day <= 20: # Evening rush - traffic_multiplier = 2.5 + random.uniform(-0.4, 0.4) - congestion = "high" - avg_speed = 12 + random.randint(-3, 8) - elif 12 <= hour_of_day <= 14: # Lunch time - traffic_multiplier = 1.6 + random.uniform(-0.2, 0.2) - congestion = "medium" - avg_speed = 25 + random.randint(-5, 10) - elif 22 <= hour_of_day or hour_of_day <= 6: # Night - traffic_multiplier = 0.3 + random.uniform(-0.1, 0.2) - congestion = "low" - avg_speed = 50 + random.randint(-10, 15) - else: # Regular hours - traffic_multiplier = 1.0 + random.uniform(-0.2, 0.2) - congestion = "medium" - avg_speed = 35 + random.randint(-10, 10) + traffic_params = self._generate_synthetic_traffic_params(record_time, random) - # Weekend adjustments - if day_of_week >= 5: # Weekend - if hour_of_day in [11, 12, 13, 14, 15]: # Weekend afternoon peak - traffic_multiplier *= 1.4 - congestion = "medium" - else: - traffic_multiplier *= 0.7 - if congestion == "high": - congestion = "medium" - - # Seasonal adjustments - if month in [7, 8]: # Summer - less traffic due to vacations - traffic_multiplier *= 0.8 - elif month in [11, 12]: # Holiday season - more traffic - traffic_multiplier *= 1.1 - - # Calculate final values - traffic_volume = max(10, int(base_traffic * traffic_multiplier)) - avg_speed = max(10, min(60, avg_speed)) - - # Pedestrian calculation - pedestrian_base = 150 - if 13 <= hour_of_day <= 15: # Lunch time - pedestrian_count = int(pedestrian_base * 2.5 * random.uniform(0.8, 1.2)) - elif 8 <= hour_of_day <= 9 or 18 <= hour_of_day <= 20: # Rush hours - pedestrian_count = int(pedestrian_base * 2.0 * random.uniform(0.8, 1.2)) - else: - pedestrian_count = int(pedestrian_base * 1.0 * random.uniform(0.5, 1.5)) - - # Create traffic record traffic_record = { "date": record_time, - "traffic_volume": traffic_volume, - "pedestrian_count": pedestrian_count, - "congestion_level": congestion, - "average_speed": avg_speed, - "occupation_percentage": min(100, traffic_volume // 2), - "load_percentage": min(100, traffic_volume // 3), + "traffic_volume": traffic_params['volume'], + "pedestrian_count": traffic_params['pedestrians'], + "congestion_level": traffic_params['congestion'], + "average_speed": traffic_params['speed'], + "occupation_percentage": min(100, traffic_params['volume'] // 2), + "load_percentage": min(100, traffic_params['volume'] // 3), "measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}", "measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})", "road_type": "URB", - "source": "synthetic_historical" + "source": DataSource.SYNTHETIC_HISTORICAL.value } historical_data.append(traffic_record) # Move to next day - current_date += timedelta(days=1) + if current_date.date() == end_date.date(): + # We've processed the end date, stop + break + else: + # Move to start of next day + current_date = (current_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) logger.info("Generated historical traffic data", - records=len(historical_data), - start=start_date, - end=end_date) + records=len(historical_data), start=start_date, end=end_date) return historical_data @@ -1011,6 +1260,132 @@ class MadridOpenDataClient(BaseAPIClient): logger.error("Error generating historical traffic data", error=str(e)) return [] - async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: - """Get traffic incidents and events""" - return [] \ No newline at end of file + def _calculate_traffic_parameters(self, hour: int, is_weekend: bool) -> Dict[str, Any]: + """Calculate traffic parameters based on time and day type""" + base_traffic = 100 + + if not is_weekend: + if 7 <= hour <= 9: + multiplier, congestion, speed = 2.2, "high", 15 + elif 18 <= hour <= 20: + multiplier, congestion, speed = 2.5, "high", 12 + elif 12 <= hour <= 14: + multiplier, congestion, speed = 1.6, "medium", 25 + else: + multiplier, congestion, speed = 1.0, "low", 40 + else: + if 11 <= hour <= 14: + multiplier, congestion, speed = 1.4, "medium", 30 + else: + multiplier, congestion, speed = 0.8, "low", 45 + + volume = int(base_traffic * multiplier) + pedestrians = int(150 * self._get_pedestrian_multiplier(hour)) + + return { + 'volume': volume, + 'congestion': congestion, + 'speed': max(10, speed), + 'pedestrians': pedestrians + } + + def _generate_synthetic_traffic_params(self, record_time: datetime, random_gen) -> Dict[str, Any]: + """Generate synthetic traffic parameters with random variations""" + hour = record_time.hour + day_of_week = record_time.weekday() + month = record_time.month + + base_params = self._calculate_traffic_parameters(hour, day_of_week >= 5) + + # Add random variations + volume_variation = random_gen.uniform(-0.3, 0.3) + speed_variation = random_gen.randint(-5, 5) + + # Apply seasonal adjustments + seasonal_multiplier = 0.8 if month in [7, 8] else (1.1 if month in [11, 12] else 1.0) + + # Weekend specific adjustments + if day_of_week >= 5 and hour in [11, 12, 13, 14, 15]: + base_params['volume'] = int(base_params['volume'] * 1.4) + base_params['congestion'] = "medium" + + return { + 'volume': max(10, int(base_params['volume'] * (1 + volume_variation) * seasonal_multiplier)), + 'congestion': base_params['congestion'], + 'speed': max(10, min(60, base_params['speed'] + speed_variation)), + 'pedestrians': int(base_params['pedestrians'] * random_gen.uniform(0.8, 1.2)) + } + + def _get_fallback_measurement_points(self, latitude: float, longitude: float) -> List[MeasurementPoint]: + """Generate fallback measurement points when CSV is not available""" + madrid_points = [ + (40.4168, -3.7038, "Madrid Centro"), + (40.4200, -3.7060, "Gran Vía"), + (40.4155, -3.7074, "Plaza Mayor"), + (40.4152, -3.6844, "Retiro"), + (40.4063, -3.6932, "Atocha"), + ] + + fallback_points = [] + for i, (lat, lon, name) in enumerate(madrid_points): + distance = self._calculate_distance(latitude, longitude, lat, lon) + point = MeasurementPoint( + id=f'fallback_{i+1000}', + latitude=lat, + longitude=lon, + distance=distance, + name=name, + type='URB' + ) + fallback_points.append(point) + + fallback_points.sort(key=lambda x: x.distance) + return fallback_points[:5] + + def _get_default_traffic_data(self) -> Dict[str, Any]: + """Get default traffic data when parsing fails""" + return { + "date": datetime.now(), + "traffic_volume": 100, + "pedestrian_count": 150, + "congestion_level": CongestionLevel.MEDIUM.value, + "average_speed": 25, + "occupation_percentage": 30, + "load_percentage": 40, + "measurement_point_id": "unknown", + "measurement_point_name": "Unknown location", + "road_type": "URB", + "source": DataSource.SYNTHETIC.value + } + + # ================================================================ + # CORE UTILITY METHODS + # ================================================================ + + def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate distance between two coordinates using Haversine formula""" + R = 6371 # Earth's radius in km + + dlat = math.radians(lat2 - lat1) + dlon = math.radians(lon2 - lon1) + + a = (math.sin(dlat/2) * math.sin(dlat/2) + + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * + math.sin(dlon/2) * math.sin(dlon/2)) + + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) + return R * c + + def _safe_int(self, value_str: str) -> int: + """Safely convert string to int""" + try: + return int(float(value_str.replace(',', '.'))) + except (ValueError, TypeError): + return 0 + + def _safe_float(self, value_str: str) -> float: + """Safely convert string to float""" + try: + return float(value_str.replace(',', '.')) + except (ValueError, TypeError): + return 0.0 \ No newline at end of file diff --git a/services/data/tests/conftest.py b/services/data/tests/conftest.py index 578ac2c5..f506bf4e 100644 --- a/services/data/tests/conftest.py +++ b/services/data/tests/conftest.py @@ -16,67 +16,3 @@ from app.core.database import Base, get_db from app.models.sales import SalesData from app.models.weather import WeatherData, WeatherForecast from app.models.traffic import TrafficData - -# Test database URL -TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" - -# Create test engine -test_engine = create_async_engine( - TEST_DATABASE_URL, - connect_args={"check_same_thread": False}, - poolclass=StaticPool, -) - -TestingSessionLocal = async_sessionmaker( - test_engine, - class_=AsyncSession, - expire_on_commit=False -) - -@pytest_asyncio.fixture -async def db(): - """Create test database session""" - async with test_engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - - async with TestingSessionLocal() as session: - yield session - - async with test_engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - -@pytest.fixture -def client(): - """Create test client""" - async def override_get_db(): - async with TestingSessionLocal() as session: - yield session - - app.dependency_overrides[get_db] = override_get_db - - with TestClient(app) as test_client: - yield test_client - - app.dependency_overrides.clear() - -@pytest.fixture -def test_tenant_id(): - """Test tenant ID""" - return uuid.uuid4() - -@pytest.fixture -def test_sales_data(): - """Sample sales data for testing""" - return { - "date": datetime.now(), - "product_name": "Pan Integral", - "quantity_sold": 25, - "revenue": 37.50, - "location_id": "madrid_centro", - "source": "test" - } - -@pytest.fixture -def mock_auth_token(): - """Mock authentication token""" - return "Bearer test-token-123" \ No newline at end of file diff --git a/services/data/tests/pytest.ini b/services/data/tests/pytest.ini new file mode 100644 index 00000000..4bd8c845 --- /dev/null +++ b/services/data/tests/pytest.ini @@ -0,0 +1,16 @@ +[tool:pytest] +# pytest.ini - Configuration for async testing +asyncio_mode = auto +addopts = -v --tb=short --capture=no +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +markers = + asyncio: mark test as async + slow: mark test as slow + integration: mark test as integration test +filterwarnings = + ignore::DeprecationWarning + ignore::PendingDeprecationWarning + ignore::PydanticDeprecatedSince20 \ No newline at end of file diff --git a/services/data/tests/test_madrid_opendata.py b/services/data/tests/test_madrid_opendata.py new file mode 100644 index 00000000..38f017cd --- /dev/null +++ b/services/data/tests/test_madrid_opendata.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python3 +""" +Updated Madrid Historical Traffic test for pytest inside Docker +Configured for June 2025 data availability (last available historical data) +""" + +import pytest +import asyncio +from datetime import datetime, timedelta +from typing import List, Dict, Any + +# Import from the actual service +from app.external.madrid_opendata import MadridOpenDataClient +from app.core.config import settings +import structlog + +# Configure pytest for async +pytestmark = pytest.mark.asyncio + +# Use actual logger +logger = structlog.get_logger() + + +class TestMadridTrafficInside: + """Test class for Madrid traffic functionality inside Docker""" + + @pytest.fixture + def client(self): + """Create Madrid client for testing""" + return MadridOpenDataClient() + + @pytest.fixture + def madrid_coords(self): + """Madrid center coordinates""" + return 40.4168, -3.7038 + + @pytest.fixture + def june_2025_dates(self): + """Date ranges for June 2025 (last available historical data)""" + return { + "quick": { + "start": datetime(2025, 6, 1, 0, 0), + "end": datetime(2025, 6, 1, 6, 0) # 6 hours on June 1st + }, + "one_day": { + "start": datetime(2025, 6, 15, 0, 0), # Mid-June + "end": datetime(2025, 6, 16, 0, 0) # One full day + }, + "three_days": { + "start": datetime(2025, 6, 10, 0, 0), + "end": datetime(2025, 6, 13, 0, 0) # 3 days in June + }, + "recent_synthetic": { + "start": datetime.now() - timedelta(hours=6), + "end": datetime.now() # Recent data (will be synthetic) + } + } + + async def test_quick_historical_traffic_june2025(self, client, madrid_coords, june_2025_dates): + """Test quick historical traffic data from June 2025""" + lat, lon = madrid_coords + date_range = june_2025_dates["quick"] + start_time = date_range["start"] + end_time = date_range["end"] + + print(f"\n=== Quick Test (June 2025 - 6 hours) ===") + print(f"Location: {lat}, {lon}") + print(f"Date range: {start_time.strftime('%Y-%m-%d %H:%M')} to {end_time.strftime('%Y-%m-%d %H:%M')}") + print(f"Note: Testing with June 2025 data (last available historical month)") + + # Test the function + execution_start = datetime.now() + result = await client.get_historical_traffic(lat, lon, start_time, end_time) + execution_time = (datetime.now() - execution_start).total_seconds() + + print(f"⏱️ Execution time: {execution_time:.2f} seconds") + print(f"📊 Records returned: {len(result)}") + + # Assertions + assert isinstance(result, list), "Result should be a list" + assert len(result) > 0, "Should return at least some records" + assert execution_time < 30, "Should execute in reasonable time (allowing for ZIP download)" + + # Check first record structure + if result: + sample = result[0] + print(f"📋 Sample record keys: {list(sample.keys())}") + print(f"📡 Data source: {sample.get('source', 'unknown')}") + + # Required fields + required_fields = ['date', 'traffic_volume', 'congestion_level', 'average_speed', 'source'] + for field in required_fields: + assert field in sample, f"Missing required field: {field}" + + # Data validation + assert isinstance(sample['traffic_volume'], int), "Traffic volume should be int" + assert 0 <= sample['traffic_volume'] <= 1000, "Traffic volume should be reasonable" + assert sample['congestion_level'] in ['low', 'medium', 'high', 'blocked'], "Invalid congestion level" + assert 5 <= sample['average_speed'] <= 100, "Speed should be reasonable" + assert isinstance(sample['date'], datetime), "Date should be datetime object" + + # Check if we got real Madrid data or synthetic + if sample['source'] == 'madrid_opendata_zip': + print(f"🎉 SUCCESS: Got real Madrid historical data from ZIP!") + else: + print(f"ℹ️ Got synthetic data (real data may not be available)") + + print(f"✅ All validations passed") + + async def test_one_day_june2025(self, client, madrid_coords, june_2025_dates): + """Test one day of June 2025 historical traffic data""" + lat, lon = madrid_coords + date_range = june_2025_dates["one_day"] + start_time = date_range["start"] + end_time = date_range["end"] + + print(f"\n=== One Day Test (June 15, 2025) ===") + print(f"Date range: {start_time.strftime('%Y-%m-%d %H:%M')} to {end_time.strftime('%Y-%m-%d %H:%M')}") + + result = await client.get_historical_traffic(lat, lon, start_time, end_time) + + print(f"📊 Records returned: {len(result)}") + + # Should have roughly 24 records (one per hour) + assert len(result) >= 20, "Should have at least 20 hourly records for one day" + assert len(result) <= 30, "Should not have more than 30 records for one day" + + # Check data source + if result: + sources = set(r['source'] for r in result) + print(f"📡 Data sources: {', '.join(sources)}") + + # If we got real data, check for realistic measurement point IDs + real_data_records = [r for r in result if r['source'] == 'madrid_opendata_zip'] + if real_data_records: + point_ids = set(r['measurement_point_id'] for r in real_data_records) + print(f"🏷️ Real measurement points found: {len(point_ids)}") + print(f" Sample IDs: {list(point_ids)[:3]}") + + # Check traffic patterns + if len(result) >= 24: + # Find rush hour records (7-9 AM, 6-8 PM) + rush_hour_records = [r for r in result if 7 <= r['date'].hour <= 9 or 18 <= r['date'].hour <= 20] + night_records = [r for r in result if r['date'].hour <= 6 or r['date'].hour >= 22] + + if rush_hour_records and night_records: + avg_rush_traffic = sum(r['traffic_volume'] for r in rush_hour_records) / len(rush_hour_records) + avg_night_traffic = sum(r['traffic_volume'] for r in night_records) / len(night_records) + + print(f"📈 Rush hour avg traffic: {avg_rush_traffic:.1f}") + print(f"🌙 Night avg traffic: {avg_night_traffic:.1f}") + + # Rush hour should typically have more traffic than night + if avg_rush_traffic > avg_night_traffic: + print(f"✅ Traffic patterns look realistic") + else: + print(f"⚠️ Traffic patterns unusual (not necessarily wrong)") + + async def test_three_days_june2025(self, client, madrid_coords, june_2025_dates): + """Test three days of June 2025 historical traffic data""" + lat, lon = madrid_coords + date_range = june_2025_dates["three_days"] + start_time = date_range["start"] + end_time = date_range["end"] + + print(f"\n=== Three Days Test (June 10-13, 2025) ===") + print(f"Date range: {start_time.strftime('%Y-%m-%d')} to {end_time.strftime('%Y-%m-%d')}") + + result = await client.get_historical_traffic(lat, lon, start_time, end_time) + + print(f"📊 Records returned: {len(result)}") + + # Should have roughly 72 records (24 hours * 3 days) + assert len(result) >= 60, "Should have at least 60 records for 3 days" + assert len(result) <= 90, "Should not have more than 90 records for 3 days" + + # Check data sources + sources = set(r['source'] for r in result) + print(f"📡 Data sources: {', '.join(sources)}") + + # Calculate statistics + traffic_volumes = [r['traffic_volume'] for r in result] + speeds = [r['average_speed'] for r in result] + + avg_traffic = sum(traffic_volumes) / len(traffic_volumes) + max_traffic = max(traffic_volumes) + min_traffic = min(traffic_volumes) + avg_speed = sum(speeds) / len(speeds) + + print(f"📈 Statistics:") + print(f" Average traffic: {avg_traffic:.1f}") + print(f" Max traffic: {max_traffic}") + print(f" Min traffic: {min_traffic}") + print(f" Average speed: {avg_speed:.1f} km/h") + + # Analyze by data source + real_data_records = [r for r in result if r['source'] == 'madrid_opendata_zip'] + synthetic_records = [r for r in result if r['source'] != 'madrid_opendata_zip'] + + print(f"🔍 Data breakdown:") + print(f" Real Madrid data: {len(real_data_records)} records") + print(f" Synthetic data: {len(synthetic_records)} records") + + if real_data_records: + # Show measurement points from real data + real_points = set(r['measurement_point_id'] for r in real_data_records) + print(f" Real measurement points: {len(real_points)}") + + # Sanity checks + assert 10 <= avg_traffic <= 500, "Average traffic should be reasonable" + assert 10 <= avg_speed <= 60, "Average speed should be reasonable" + assert max_traffic >= avg_traffic, "Max should be >= average" + assert min_traffic <= avg_traffic, "Min should be <= average" + + async def test_recent_vs_historical_data(self, client, madrid_coords, june_2025_dates): + """Compare recent data (synthetic) vs June 2025 data (potentially real)""" + lat, lon = madrid_coords + + print(f"\n=== Recent vs Historical Data Comparison ===") + + # Test recent data (should be synthetic) + recent_range = june_2025_dates["recent_synthetic"] + recent_result = await client.get_historical_traffic( + lat, lon, recent_range["start"], recent_range["end"] + ) + + # Test June 2025 data (potentially real) + june_range = june_2025_dates["quick"] + june_result = await client.get_historical_traffic( + lat, lon, june_range["start"], june_range["end"] + ) + + print(f"📊 Recent data: {len(recent_result)} records") + print(f"📊 June 2025 data: {len(june_result)} records") + + if recent_result: + recent_sources = set(r['source'] for r in recent_result) + print(f"📡 Recent sources: {', '.join(recent_sources)}") + + if june_result: + june_sources = set(r['source'] for r in june_result) + print(f"📡 June sources: {', '.join(june_sources)}") + + # Check if we successfully got real data from June + if 'madrid_opendata_zip' in june_sources: + print(f"🎉 SUCCESS: Real Madrid data successfully fetched from June 2025!") + + # Show details of real data + real_records = [r for r in june_result if r['source'] == 'madrid_opendata_zip'] + if real_records: + sample = real_records[0] + print(f"📋 Real data sample:") + print(f" Date: {sample['date']}") + print(f" Traffic volume: {sample['traffic_volume']}") + print(f" Measurement point: {sample['measurement_point_id']}") + print(f" Point name: {sample.get('measurement_point_name', 'N/A')}") + else: + print(f"ℹ️ June data is synthetic (real ZIP may not be accessible)") + + async def test_madrid_zip_month_code(self, client): + """Test the month code calculation for Madrid ZIP files""" + print(f"\n=== Madrid ZIP Month Code Test ===") + + # Test the month code calculation function + test_cases = [ + (2025, 6, 145), # Known: June 2025 = 145 + (2025, 5, 144), # Known: May 2025 = 144 + (2025, 4, 143), # Known: April 2025 = 143 + (2025, 7, 146), # Predicted: July 2025 = 146 + ] + + for year, month, expected_code in test_cases: + if hasattr(client, '_calculate_madrid_month_code'): + calculated_code = client._calculate_madrid_month_code(year, month) + status = "✅" if calculated_code == expected_code else "⚠️" + print(f"{status} {year}-{month:02d}: Expected {expected_code}, Got {calculated_code}") + + # Generate ZIP URL + if calculated_code: + zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{calculated_code}-transporte-ptomedida-historico.zip" + print(f" ZIP URL: {zip_url}") + else: + print(f"⚠️ Month code calculation function not available") + + async def test_edge_case_large_date_range(self, client, madrid_coords): + """Test edge case: date range too large""" + lat, lon = madrid_coords + start_time = datetime(2025, 1, 1) # 6+ months range + end_time = datetime(2025, 7, 1) + + print(f"\n=== Edge Case: Large Date Range ===") + print(f"Testing 6-month range: {start_time.date()} to {end_time.date()}") + + result = await client.get_historical_traffic(lat, lon, start_time, end_time) + + print(f"📊 Records for 6-month range: {len(result)}") + + # Should return empty list for ranges > 90 days + assert len(result) == 0, "Should return empty list for date ranges > 90 days" + print(f"✅ Correctly handled large date range") + + async def test_edge_case_invalid_coordinates(self, client): + """Test edge case: invalid coordinates""" + print(f"\n=== Edge Case: Invalid Coordinates ===") + + start_time = datetime(2025, 6, 1) + end_time = datetime(2025, 6, 1, 6, 0) + + # Test with invalid coordinates + result = await client.get_historical_traffic(999.0, 999.0, start_time, end_time) + + print(f"📊 Records for invalid coords: {len(result)}") + + # Should either return empty list or synthetic data + # The function should not crash + assert isinstance(result, list), "Should return list even with invalid coords" + print(f"✅ Handled invalid coordinates gracefully") + + async def test_real_madrid_zip_access(self, client): + """Test if we can access the actual Madrid ZIP files""" + print(f"\n=== Real Madrid ZIP Access Test ===") + + # Test the known ZIP URLs you provided + test_urls = [ + "https://datos.madrid.es/egob/catalogo/208627-145-transporte-ptomedida-historico.zip", # June 2025 + "https://datos.madrid.es/egob/catalogo/208627-144-transporte-ptomedida-historico.zip", # May 2025 + "https://datos.madrid.es/egob/catalogo/208627-143-transporte-ptomedida-historico.zip", # April 2025 + ] + + for i, url in enumerate(test_urls): + month_name = ["June 2025", "May 2025", "April 2025"][i] + print(f"\nTesting {month_name}: {url}") + + try: + if hasattr(client, '_fetch_historical_zip'): + zip_data = await client._fetch_historical_zip(url) + if zip_data: + print(f"✅ Successfully fetched ZIP: {len(zip_data)} bytes") + + # Try to inspect ZIP contents + try: + import zipfile + from io import BytesIO + + with zipfile.ZipFile(BytesIO(zip_data), 'r') as zip_file: + files = zip_file.namelist() + csv_files = [f for f in files if f.endswith('.csv')] + print(f"📁 ZIP contains {len(files)} files, {len(csv_files)} CSV files") + + if csv_files: + print(f" CSV files: {csv_files[:2]}{'...' if len(csv_files) > 2 else ''}") + + except Exception as e: + print(f"⚠️ Could not inspect ZIP contents: {e}") + else: + print(f"❌ Failed to fetch ZIP") + else: + print(f"⚠️ ZIP fetch function not available") + + except Exception as e: + print(f"❌ Error testing ZIP access: {e}") + + +# Additional standalone test functions for manual running +async def run_manual_test(): + """Manual test function that can be run directly""" + print("="*60) + print("MADRID TRAFFIC TEST - JUNE 2025 DATA") + print("="*60) + + client = MadridOpenDataClient() + madrid_lat, madrid_lon = 40.4168, -3.7038 + + # Test with June 2025 data (last available) + start_time = datetime(2025, 6, 15, 14, 0) # June 15, 2025 at 2 PM + end_time = datetime(2025, 6, 15, 18, 0) # Until 6 PM (4 hours) + + print(f"\nTesting June 15, 2025 data (2 PM - 6 PM)...") + print(f"This should include afternoon traffic patterns") + + result = await client.get_historical_traffic(madrid_lat, madrid_lon, start_time, end_time) + + print(f"Result: {len(result)} records") + + if result: + sources = set(r['source'] for r in result) + print(f"Data sources: {', '.join(sources)}") + + if 'madrid_opendata_zip' in sources: + print(f"🎉 Successfully got real Madrid data!") + + sample = result[0] + print(f"\nSample record:") + for key, value in sample.items(): + if key == "date": + print(f" {key}: {value.strftime('%Y-%m-%d %H:%M:%S')}") + else: + print(f" {key}: {value}") + + print(f"\n✅ Manual test completed!") + + +if __name__ == "__main__": + # If run directly, execute manual test + asyncio.run(run_manual_test()) \ No newline at end of file