diff --git a/services/data/app/external/madrid_opendata.py b/services/data/app/external/madrid_opendata.py
index 2bd655c0..20889131 100644
--- a/services/data/app/external/madrid_opendata.py
+++ b/services/data/app/external/madrid_opendata.py
@@ -1,53 +1,177 @@
# ================================================================
-# services/data/app/external/madrid_opendata.py - FIXED XML PARSER
+# services/data/app/external/madrid_opendata.py - REFACTORED
# ================================================================
-"""Madrid Open Data API client with fixed XML parser for actual structure"""
+"""
+Madrid Open Data API client with clean architecture and best practices
+
+Features:
+- Real-time traffic data from XML endpoints
+- Historical traffic data from ZIP files
+- Measurement points integration
+- Robust error handling and fallbacks
+- Comprehensive logging
+"""
import math
import xml.etree.ElementTree as ET
-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
import structlog
import re
+from dataclasses import dataclass
+from enum import Enum
from app.external.base_client import BaseAPIClient
from app.core.config import settings
-
import pyproj
logger = structlog.get_logger()
+# ================================================================
+# CONSTANTS AND ENUMS
+# ================================================================
+
+class TrafficServiceLevel(Enum):
+ """Madrid traffic service levels"""
+ FLUID = 0
+ DENSE = 1
+ CONGESTED = 2
+ BLOCKED = 3
+
+class CongestionLevel(Enum):
+ """Standardized congestion levels"""
+ LOW = "low"
+ MEDIUM = "medium"
+ HIGH = "high"
+ BLOCKED = "blocked"
+
+class DataSource(Enum):
+ """Data source types"""
+ MADRID_REALTIME = "madrid_opendata"
+ MADRID_HISTORICAL = "madrid_opendata_zip"
+ SYNTHETIC = "synthetic"
+ SYNTHETIC_HISTORICAL = "synthetic_historical"
+
+# Madrid geographic bounds
+MADRID_BOUNDS = {
+ 'lat_min': 40.31, 'lat_max': 40.56,
+ 'lon_min': -3.89, 'lon_max': -3.51
+}
+
+# Constants
+MAX_HISTORICAL_DAYS = 90
+MAX_CSV_PROCESSING_ROWS = 50000
+MEASUREMENT_POINTS_LIMIT = 20
+UTM_ZONE = 30 # Madrid is in UTM Zone 30N
+
+@dataclass
+class MeasurementPoint:
+ """Measurement point data structure"""
+ id: str
+ latitude: float
+ longitude: float
+ distance: float
+ name: str
+ type: str
+
+@dataclass
+class TrafficRecord:
+ """Traffic record data structure"""
+ date: datetime
+ traffic_volume: int
+ occupation_percentage: int
+ load_percentage: int
+ average_speed: int
+ congestion_level: str
+ pedestrian_count: int
+ measurement_point_id: str
+ measurement_point_name: str
+ road_type: str
+ source: str
+ error_status: Optional[str] = None
+ # Madrid-specific raw data
+ intensidad_raw: Optional[int] = None
+ ocupacion_raw: Optional[int] = None
+ carga_raw: Optional[int] = None
+ vmed_raw: Optional[int] = None
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert TrafficRecord to dictionary"""
+ result = {
+ "date": self.date,
+ "traffic_volume": self.traffic_volume,
+ "occupation_percentage": self.occupation_percentage,
+ "load_percentage": self.load_percentage,
+ "average_speed": self.average_speed,
+ "congestion_level": self.congestion_level,
+ "pedestrian_count": self.pedestrian_count,
+ "measurement_point_id": self.measurement_point_id,
+ "measurement_point_name": self.measurement_point_name,
+ "road_type": self.road_type,
+ "source": self.source
+ }
+
+ # Add optional fields if present
+ optional_fields = ['error_status', 'intensidad_raw', 'ocupacion_raw', 'carga_raw', 'vmed_raw']
+ for field in optional_fields:
+ value = getattr(self, field, None)
+ if value is not None:
+ result[field] = value
+
+ return result
+
+
+# ================================================================
+# MADRID OPEN DATA CLIENT
+# ================================================================
+
class MadridOpenDataClient(BaseAPIClient):
+ """
+ Madrid Open Data API client with comprehensive traffic data support
+
+ Provides both real-time and historical traffic data from Madrid's open data portal.
+ Implements robust error handling, coordinate conversion, and synthetic data fallbacks.
+ """
def __init__(self):
- super().__init__(
- base_url="https://datos.madrid.es",
- api_key=None
- )
-
- # WORKING Madrid traffic endpoints (verified)
+ super().__init__(base_url="https://datos.madrid.es")
self.traffic_endpoints = [
- # Primary working endpoint
- "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml",
+ "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml"
]
+ self.measurement_points_url = "https://datos.madrid.es/egob/catalogo/202468-260-intensidad-trafico.csv"
+ self._conversion_log_count = [] # Track coordinate conversion logging
+
+ # Initialize coordinate converter
+ self.utm_proj = pyproj.Proj(proj='utm', zone=UTM_ZONE, ellps='WGS84', preserve_units=False)
+
+ # ================================================================
+ # PUBLIC API METHODS
+ # ================================================================
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
- """Get current traffic data for location using working Madrid endpoints"""
+ """
+ Get current traffic data for location using working Madrid endpoints
+
+ Args:
+ latitude: Query location latitude
+ longitude: Query location longitude
+
+ Returns:
+ Dict with current traffic data or None if not available
+ """
try:
logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude)
- # Try the working endpoint
+ # Try real-time endpoints
for endpoint in self.traffic_endpoints:
try:
- logger.debug("Trying traffic endpoint", endpoint=endpoint)
traffic_data = await self._fetch_traffic_xml_data(endpoint)
if traffic_data:
logger.info("Successfully fetched Madrid traffic data",
- endpoint=endpoint,
- points=len(traffic_data))
+ endpoint=endpoint, points=len(traffic_data))
- # Find nearest traffic measurement point
+ # Find nearest measurement point
nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data)
if nearest_point:
@@ -57,15 +181,16 @@ class MadridOpenDataClient(BaseAPIClient):
point_id=nearest_point.get('idelem'))
return parsed_data
else:
+ closest_distance = self._get_closest_distance(latitude, longitude, traffic_data)
logger.debug("No nearby traffic points found",
lat=latitude, lon=longitude,
- closest_distance=self._get_closest_distance(latitude, longitude, traffic_data))
+ closest_distance=closest_distance)
except Exception as e:
logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e))
continue
- # If no real data available, use synthetic data
+ # Fallback to synthetic data
logger.info("No nearby Madrid traffic points found, using synthetic data")
return await self._generate_synthetic_traffic(latitude, longitude)
@@ -73,6 +198,57 @@ class MadridOpenDataClient(BaseAPIClient):
logger.error("Failed to get current traffic", error=str(e))
return await self._generate_synthetic_traffic(latitude, longitude)
+ async def get_historical_traffic(self, latitude: float, longitude: float,
+ start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
+ """
+ Get historical traffic data from Madrid Open Data ZIP files
+
+ Args:
+ latitude: Query location latitude
+ longitude: Query location longitude
+ start_date: Start date for historical data
+ end_date: End date for historical data
+
+ Returns:
+ List of historical traffic data dictionaries
+ """
+ try:
+ logger.debug("Fetching Madrid historical traffic data",
+ lat=latitude, lon=longitude, start=start_date, end=end_date)
+
+ # Validate date range
+ if not self._validate_date_range(start_date, end_date):
+ return []
+
+ # Generate synthetic data as fallback
+ synthetic_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date)
+ logger.info("Generated synthetic historical traffic data", records=len(synthetic_data))
+
+ # Try to fetch real data
+ try:
+ real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date)
+ if real_data:
+ logger.info("Fetched real historical traffic data from ZIP files", records=len(real_data))
+ return real_data
+ else:
+ logger.info("No real historical data available, using synthetic data")
+ #return synthetic_data
+ except Exception as e:
+ logger.warning("Failed to fetch real historical data, using synthetic", error=str(e))
+ return synthetic_data
+
+ except Exception as e:
+ logger.error("Error getting historical traffic data", error=str(e))
+ return []
+
+ async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
+ """Get traffic incidents and events (placeholder for future implementation)"""
+ return []
+
+ # ================================================================
+ # REAL-TIME TRAFFIC METHODS
+ # ================================================================
+
async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch and parse Madrid traffic XML data"""
try:
@@ -82,12 +258,10 @@ class MadridOpenDataClient(BaseAPIClient):
logger.debug("No XML content received", endpoint=endpoint)
return None
- # Log XML structure for debugging
logger.debug("Madrid XML content preview",
length=len(xml_content),
first_500=xml_content[:500] if len(xml_content) > 500 else xml_content)
- # Parse Madrid traffic XML with the correct structure
traffic_points = self._parse_madrid_traffic_xml(xml_content)
if traffic_points:
@@ -102,22 +276,15 @@ class MadridOpenDataClient(BaseAPIClient):
return None
def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]:
- """Parse Madrid traffic XML with correct structure (...)"""
+ """Parse Madrid traffic XML with correct structure"""
traffic_points = []
try:
- # Clean the XML to handle undefined entities and encoding issues
cleaned_xml = self._clean_madrid_xml(xml_content)
-
- # Parse XML
root = ET.fromstring(cleaned_xml)
- # Log XML structure
- logger.debug("Madrid XML structure",
- root_tag=root.tag,
- children_count=len(list(root)))
+ logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root)))
- # Madrid uses root with children
if root.tag == 'pms':
pm_elements = root.findall('pm')
logger.debug("Found PM elements", count=len(pm_elements))
@@ -126,10 +293,7 @@ class MadridOpenDataClient(BaseAPIClient):
try:
traffic_point = self._extract_madrid_pm_element(pm)
- # Validate essential data (coordinates and ID)
- if (traffic_point.get('latitude') and
- traffic_point.get('longitude') and
- traffic_point.get('idelem')):
+ if self._is_valid_traffic_point(traffic_point):
traffic_points.append(traffic_point)
# Log first few points for debugging
@@ -151,60 +315,298 @@ class MadridOpenDataClient(BaseAPIClient):
except ET.ParseError as e:
logger.warning("Failed to parse Madrid XML", error=str(e))
- # Try regex extraction as fallback
return self._extract_traffic_data_regex(xml_content)
except Exception as e:
logger.error("Error in Madrid traffic XML parsing", error=str(e))
return []
- def _clean_madrid_xml(self, xml_content: str) -> str:
- """Clean Madrid XML to handle undefined entities and encoding issues"""
+ # ================================================================
+ # HISTORICAL TRAFFIC METHODS
+ # ================================================================
+
+ async def _fetch_real_historical_traffic(self, latitude: float, longitude: float,
+ start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
+ """Fetch real historical traffic data from Madrid ZIP files"""
try:
- # Remove BOM if present
- xml_content = xml_content.lstrip('\ufeff')
+ historical_data = []
+ current_date = start_date.replace(day=1)
- # Remove or replace undefined entities that cause parsing errors
- # Common undefined entities in Madrid data
- xml_content = xml_content.replace(' ', ' ')
- xml_content = xml_content.replace('©', '©')
- xml_content = xml_content.replace('®', '®')
- xml_content = xml_content.replace('™', '™')
+ while current_date <= end_date:
+ try:
+ month_code = self._calculate_madrid_month_code(current_date.year, current_date.month)
+
+ if month_code:
+ zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip"
+ logger.debug("Trying ZIP URL", url=zip_url,
+ year=current_date.year, month=current_date.month, code=month_code)
+
+ zip_data = await self._fetch_historical_zip(zip_url)
+ if zip_data:
+ month_data = await self._parse_historical_zip(zip_data, latitude, longitude, start_date, end_date)
+ historical_data.extend(month_data)
+ logger.info("Fetched historical data for month",
+ year=current_date.year, month=current_date.month, records=len(month_data))
+ else:
+ logger.debug("No ZIP data found for month",
+ year=current_date.year, month=current_date.month)
+ else:
+ logger.debug("Could not calculate month code",
+ year=current_date.year, month=current_date.month)
+
+ current_date = self._get_next_month(current_date)
+
+ except Exception as e:
+ logger.warning("Error fetching data for month",
+ year=current_date.year, month=current_date.month, error=str(e))
+ current_date = self._get_next_month(current_date)
- # Fix unescaped ampersands (but not already escaped ones)
- xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content)
-
- # Remove invalid control characters
- xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content)
-
- # Handle Spanish characters that might be causing issues
- spanish_chars = {
- 'ñ': 'n', 'Ñ': 'N',
- 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
- 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U',
- 'ü': 'u', 'Ü': 'U'
- }
-
- for spanish_char, replacement in spanish_chars.items():
- xml_content = xml_content.replace(spanish_char, replacement)
-
- return xml_content
+ return historical_data
except Exception as e:
- logger.warning("Error cleaning Madrid XML", error=str(e))
- return xml_content
+ logger.error("Error fetching real historical traffic data", error=str(e))
+ return []
+
+ async def _parse_historical_zip(self, zip_content: bytes, latitude: float, longitude: float,
+ start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
+ """Parse Madrid historical traffic ZIP file"""
+ try:
+ import zipfile
+ from io import BytesIO
+
+ historical_records = []
+
+ with zipfile.ZipFile(BytesIO(zip_content), 'r') as zip_file:
+ logger.debug("ZIP file contents", files=zip_file.namelist())
+
+ csv_files = [f for f in zip_file.namelist()
+ if f.endswith('.csv') and not f.startswith('__MACOSX')]
+
+ if not csv_files:
+ logger.warning("No CSV files found in ZIP")
+ return []
+
+ for csv_filename in csv_files:
+ logger.debug("Processing CSV file", filename=csv_filename)
+
+ try:
+ csv_content = self._extract_csv_from_zip(zip_file, csv_filename)
+ if csv_content:
+ file_records = await self._parse_csv_content(
+ csv_content, latitude, longitude, start_date, end_date
+ )
+ historical_records.extend(file_records)
+
+ logger.debug("Processed CSV file",
+ filename=csv_filename, records=len(file_records))
+
+ except Exception as e:
+ logger.warning("Error processing CSV file",
+ filename=csv_filename, error=str(e))
+ continue
+
+ return historical_records
+
+ except Exception as e:
+ logger.error("Error parsing historical ZIP", error=str(e))
+ return []
+
+ # ================================================================
+ # DATA PARSING AND CONVERSION METHODS
+ # ================================================================
+
+ async def _parse_csv_content(self, csv_content: str, latitude: float, longitude: float,
+ start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
+ """Parse CSV content from Madrid historical traffic data"""
+ try:
+ import csv
+ from io import StringIO
+
+ csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
+
+ if not self._validate_csv_structure(csv_reader.fieldnames):
+ return []
+
+ logger.debug("Madrid CSV structure detected", fields=csv_reader.fieldnames)
+
+ # Get nearest measurement points
+ measurement_points = await self._get_measurement_points_near_location(latitude, longitude)
+ target_point_ids = [str(point.id) for point in measurement_points[:10]]
+
+ logger.debug("Target measurement points", ids=target_point_ids[:3])
+
+ # Process CSV rows
+ historical_records = []
+ processed_count = 0
+
+ for row_num, row in enumerate(csv_reader):
+ if processed_count >= MAX_CSV_PROCESSING_ROWS:
+ logger.info("Reached processing limit", limit=MAX_CSV_PROCESSING_ROWS)
+ break
+
+ try:
+ traffic_record = self._parse_csv_row(row, target_point_ids, start_date, end_date)
+ if traffic_record:
+ historical_records.append(traffic_record.to_dict())
+ processed_count += 1
+
+ if processed_count % 1000 == 0:
+ logger.debug("Processing progress", processed=processed_count)
+
+ except Exception as e:
+ if row_num % 5000 == 0:
+ logger.debug("Error parsing CSV row", row_num=row_num, error=str(e))
+ continue
+
+ logger.info("Successfully parsed Madrid CSV",
+ total_rows=row_num + 1, processed=processed_count, records=len(historical_records))
+
+ # Enrich with location data
+ if historical_records and measurement_points:
+ historical_records = self._enrich_with_location_data(historical_records, measurement_points)
+
+ return historical_records
+
+ except Exception as e:
+ logger.error("Error parsing Madrid CSV content", error=str(e))
+ return []
+
+ def _parse_csv_row(self, row: Dict[str, str], target_point_ids: List[str],
+ start_date: datetime, end_date: datetime) -> Optional[TrafficRecord]:
+ """Parse a single CSV row into a TrafficRecord"""
+ try:
+ # Extract and validate point ID
+ point_id = str(row.get('id', '')).strip()
+ if not point_id or (target_point_ids and point_id not in target_point_ids):
+ return None
+
+ # Parse date
+ record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"'))
+ if not record_date or not (start_date <= record_date <= end_date):
+ return None
+
+ # Parse traffic data
+ intensidad = self._safe_int(row.get('intensidad', '0'))
+ ocupacion = self._safe_int(row.get('ocupacion', '0'))
+ carga = self._safe_int(row.get('carga', '0'))
+ vmed = self._safe_int(row.get('vmed', '0'))
+ tipo_elem = row.get('tipo_elem', '').strip().strip('"')
+ error = row.get('error', 'N').strip().strip('"')
+
+ # Skip erroneous records
+ if error == 'S':
+ return None
+
+ # Calculate derived metrics
+ avg_speed = self._calculate_average_speed(vmed, carga, ocupacion)
+ congestion_level = self._determine_congestion_level(carga, avg_speed)
+ pedestrian_count = self._calculate_pedestrian_count(tipo_elem, record_date.hour)
+
+ return TrafficRecord(
+ date=record_date,
+ traffic_volume=intensidad,
+ occupation_percentage=ocupacion,
+ load_percentage=carga,
+ average_speed=avg_speed,
+ congestion_level=congestion_level,
+ pedestrian_count=pedestrian_count,
+ measurement_point_id=point_id,
+ measurement_point_name=f"Madrid Point {point_id}",
+ road_type=tipo_elem,
+ source=DataSource.MADRID_HISTORICAL.value,
+ error_status=error,
+ intensidad_raw=intensidad,
+ ocupacion_raw=ocupacion,
+ carga_raw=carga,
+ vmed_raw=vmed
+ )
+
+ except Exception as e:
+ logger.debug("Error parsing CSV row", error=str(e))
+ return None
+
+ # ================================================================
+ # MEASUREMENT POINTS METHODS
+ # ================================================================
+
+ async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
+ """Get measurement points near the specified location"""
+ try:
+ points_csv = await self._fetch_measurement_points_csv(self.measurement_points_url)
+
+ if points_csv:
+ return await self._parse_measurement_points_csv(points_csv, latitude, longitude)
+ else:
+ logger.info("Using fallback measurement points")
+ return self._get_fallback_measurement_points(latitude, longitude)
+
+ except Exception as e:
+ logger.warning("Error getting measurement points", error=str(e))
+ return self._get_fallback_measurement_points(latitude, longitude)
+
+ async def _parse_measurement_points_csv(self, csv_content: str, query_lat: float, query_lon: float) -> List[MeasurementPoint]:
+ """Parse measurement points CSV and find nearest points"""
+ try:
+ import csv
+ from io import StringIO
+
+ points_with_distance = []
+ csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
+
+ for row in csv_reader:
+ try:
+ point_id = row.get('id', '').strip()
+ latitud = row.get('latitud', '').strip()
+ longitud = row.get('longitud', '').strip()
+ nombre = row.get('nombre', '').strip().strip('"')
+ tipo_elem = row.get('tipo_elem', '').strip().strip('"')
+
+ if not (point_id and latitud and longitud):
+ continue
+
+ lat, lon = float(latitud), float(longitud)
+ distance = self._calculate_distance(query_lat, query_lon, lat, lon)
+
+ point = MeasurementPoint(
+ id=point_id,
+ latitude=lat,
+ longitude=lon,
+ distance=distance,
+ name=nombre or f'Point {point_id}',
+ type=tipo_elem
+ )
+
+ points_with_distance.append(point)
+
+ except Exception as e:
+ logger.debug("Error parsing measurement point row", error=str(e))
+ continue
+
+ # Sort by distance and return closest points
+ points_with_distance.sort(key=lambda x: x.distance)
+ closest_points = points_with_distance[:MEASUREMENT_POINTS_LIMIT]
+
+ logger.info("Found measurement points",
+ total=len(points_with_distance), closest=len(closest_points))
+
+ return closest_points
+
+ except Exception as e:
+ logger.error("Error parsing measurement points CSV", error=str(e))
+ return []
+
+ # ================================================================
+ # COORDINATE CONVERSION METHODS
+ # ================================================================
def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]:
- """Extract traffic data from Madrid element with proper coordinate conversion"""
+ """Extract traffic data from Madrid element with coordinate conversion"""
try:
- # Based on the actual Madrid XML structure shown in logs
point_data = {}
- utm_x = None
- utm_y = None
+ utm_x = utm_y = None
# Extract all child elements
for child in pm_element:
- tag = child.tag
- text = child.text.strip() if child.text else ''
+ tag, text = child.tag, child.text.strip() if child.text else ''
if tag == 'idelem':
point_data['idelem'] = text
@@ -219,125 +621,224 @@ class MadridOpenDataClient(BaseAPIClient):
elif tag == 'nivelServicio':
point_data['nivelServicio'] = self._safe_int(text)
elif tag == 'st_x':
- # Store UTM X coordinate for later conversion
utm_x = text
- point_data['utm_x'] = text # Keep original for debugging
+ point_data['utm_x'] = text
elif tag == 'st_y':
- # Store UTM Y coordinate for later conversion
utm_y = text
- point_data['utm_y'] = text # Keep original for debugging
+ point_data['utm_y'] = text
elif tag == 'error':
point_data['error'] = text
- elif tag == 'subarea':
- point_data['subarea'] = text
- elif tag == 'accesoAsociado':
- point_data['accesoAsociado'] = text
- elif tag == 'intensidadSat':
- point_data['intensidadSat'] = self._safe_int(text)
+ elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']:
+ point_data[tag] = text
- # Convert UTM coordinates to lat/lon if both are available
+ # Convert coordinates
if utm_x and utm_y:
- latitude, longitude = self._convert_utm_coordinates_accurate(utm_x, utm_y)
+ latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y)
- if latitude is not None and longitude is not None:
- # Validate that coordinates are actually in Madrid area
- if self._validate_madrid_coordinates(latitude, longitude):
- point_data['latitude'] = latitude
- point_data['longitude'] = longitude
-
- # Log first few successful conversions for verification
- if len(getattr(self, '_conversion_log_count', [])) < 3:
- if not hasattr(self, '_conversion_log_count'):
- self._conversion_log_count = []
- self._conversion_log_count.append(1)
-
- logger.debug("Successful UTM conversion",
- idelem=point_data.get('idelem'),
- utm_x=utm_x,
- utm_y=utm_y,
- latitude=latitude,
- longitude=longitude,
- descripcion=point_data.get('descripcion'))
- else:
- # Log invalid coordinates for debugging
- logger.debug("Invalid Madrid coordinates after conversion",
- idelem=point_data.get('idelem'),
- utm_x=utm_x,
- utm_y=utm_y,
- converted_lat=latitude,
- converted_lon=longitude,
- descripcion=point_data.get('descripcion'))
- # Don't include this point - return empty dict
- return {}
+ if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude):
+ point_data.update({'latitude': latitude, 'longitude': longitude})
+
+ # Log successful conversions (limited)
+ self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude)
+ return point_data
else:
- # Conversion failed
- logger.debug("UTM conversion failed",
- idelem=point_data.get('idelem'),
- utm_x=utm_x,
- utm_y=utm_y)
+ logger.debug("Invalid coordinates after conversion",
+ idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y)
return {}
else:
- # Missing coordinates
- logger.debug("Missing UTM coordinates",
- idelem=point_data.get('idelem'),
- has_utm_x=utm_x is not None,
- has_utm_y=utm_y is not None)
+ logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem'))
return {}
- return point_data
-
except Exception as e:
logger.debug("Error extracting Madrid PM element", error=str(e))
return {}
-
- def _convert_utm_coordinates_accurate(self, utm_x_str: str, utm_y_str: str) -> tuple[Optional[float], Optional[float]]:
- """Convert UTM coordinates to lat/lon using accurate pyproj library"""
+ def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]:
+ """Convert UTM coordinates to lat/lon using pyproj"""
try:
utm_x = float(utm_x_str.replace(',', '.'))
utm_y = float(utm_y_str.replace(',', '.'))
- # Define UTM Zone 30N projection (EPSG:25830)
- utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', preserve_units=False)
-
- # Convert to latitude/longitude
- longitude, latitude = utm_proj(utm_x, utm_y, inverse=True)
-
+ longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True)
return round(latitude, 6), round(longitude, 6)
except (ValueError, TypeError, Exception):
return None, None
-
- def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool:
- """Validate that converted coordinates are actually in Madrid area"""
- # Madrid bounds (expanded slightly to include metro area)
- madrid_lat_min, madrid_lat_max = 40.31, 40.56
- madrid_lon_min, madrid_lon_max = -3.89, -3.51
+
+ # ================================================================
+ # UTILITY AND HELPER METHODS
+ # ================================================================
+
+ def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool:
+ """Validate date range for historical data requests"""
+ days_diff = (end_date - start_date).days
- return (madrid_lat_min <= latitude <= madrid_lat_max and
- madrid_lon_min <= longitude <= madrid_lon_max)
+ # Allow same-day ranges (days_diff = 0) and ranges within the same day
+ if days_diff < 0:
+ logger.warning("End date before start date", start=start_date, end=end_date)
+ return False
+
+ if days_diff > MAX_HISTORICAL_DAYS:
+ logger.warning("Date range too large for historical traffic data", days=days_diff)
+ return False
+
+ return True
- def _safe_int(self, value_str: str) -> int:
- """Safely convert string to int"""
+ def _calculate_madrid_month_code(self, year: int, month: int) -> Optional[int]:
+ """Calculate Madrid's month code for ZIP files (June 2025 = 145)"""
try:
- return int(float(value_str.replace(',', '.')))
- except (ValueError, TypeError):
- return 0
+ reference_year, reference_month, reference_code = 2025, 6, 145
+ months_diff = (year - reference_year) * 12 + (month - reference_month)
+ estimated_code = reference_code + months_diff
+
+ if 100 <= estimated_code <= 300:
+ return estimated_code
+ else:
+ logger.warning("Month code out of range", year=year, month=month, code=estimated_code)
+ return None
+
+ except Exception as e:
+ logger.error("Error calculating month code", year=year, month=month, error=str(e))
+ return None
- def _safe_float(self, value_str: str) -> float:
- """Safely convert string to float"""
+ def _calculate_average_speed(self, vmed: int, carga: int, ocupacion: int) -> int:
+ """Calculate average speed based on available data"""
+ if vmed > 0: # M30 points have speed data
+ return vmed
+ else: # Urban points - estimate from carga and ocupacion
+ if carga >= 80:
+ speed = 15
+ elif carga >= 50:
+ speed = 25
+ elif carga >= 20:
+ speed = 35
+ else:
+ speed = 45
+
+ # Adjust based on occupation
+ if ocupacion >= 30:
+ speed = max(10, speed - 10)
+ elif ocupacion <= 5:
+ speed = min(50, speed + 5)
+
+ return speed
+
+ def _determine_congestion_level(self, carga: int, avg_speed: int) -> str:
+ """Determine congestion level from carga and speed"""
+ if carga >= 90 and avg_speed <= 10:
+ return CongestionLevel.BLOCKED.value
+ elif carga >= 75:
+ return CongestionLevel.HIGH.value
+ elif carga >= 40:
+ return CongestionLevel.MEDIUM.value
+ else:
+ return CongestionLevel.LOW.value
+
+ def _calculate_pedestrian_count(self, tipo_elem: str, hour: int) -> int:
+ """Calculate pedestrian estimate based on area type and time"""
+ if tipo_elem == 'URB':
+ base = 200
+ if 12 <= hour <= 14: # Lunch time
+ multiplier = 2.0
+ elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
+ multiplier = 1.5
+ else:
+ multiplier = 1.0
+ else: # M30, C30
+ base = 50
+ multiplier = 0.5
+
+ return int(base * multiplier)
+
+ def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]:
+ """Parse Madrid date format"""
+ if not fecha_str:
+ return None
+
try:
- return float(value_str.replace(',', '.'))
- except (ValueError, TypeError):
- return 0.0
+ return datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S')
+ except ValueError:
+ try:
+ return datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S')
+ except ValueError:
+ return None
+
+ def _validate_csv_structure(self, fieldnames: Optional[List[str]]) -> bool:
+ """Validate CSV has expected structure"""
+ if not fieldnames:
+ logger.warning("No CSV fieldnames found")
+ return False
+
+ expected_fields = ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga']
+ missing_fields = [field for field in expected_fields if field not in fieldnames]
+
+ if missing_fields:
+ logger.warning("Missing expected fields in CSV", missing=missing_fields, available=fieldnames)
+
+ return True # Continue processing even with some missing fields
+
+ def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool:
+ """Check if traffic point has valid essential data"""
+ return (traffic_point.get('latitude') and
+ traffic_point.get('longitude') and
+ traffic_point.get('idelem'))
+
+ def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool:
+ """Validate coordinates are in Madrid area"""
+ return (MADRID_BOUNDS['lat_min'] <= latitude <= MADRID_BOUNDS['lat_max'] and
+ MADRID_BOUNDS['lon_min'] <= longitude <= MADRID_BOUNDS['lon_max'])
+
+ def _get_next_month(self, current_date: datetime) -> datetime:
+ """Get next month date"""
+ if current_date.month == 12:
+ return current_date.replace(year=current_date.year + 1, month=1)
+ else:
+ return current_date.replace(month=current_date.month + 1)
+
+ def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str,
+ latitude: float, longitude: float) -> None:
+ """Log coordinate conversion (limited to first few for debugging)"""
+ if len(self._conversion_log_count) < 3:
+ self._conversion_log_count.append(1)
+ logger.debug("Successful UTM conversion",
+ idelem=point_data.get('idelem'),
+ utm_x=utm_x, utm_y=utm_y,
+ latitude=latitude, longitude=longitude,
+ descripcion=point_data.get('descripcion'))
+
+ def _enrich_with_location_data(self, records: List[Dict[str, Any]],
+ measurement_points: List[MeasurementPoint]) -> List[Dict[str, Any]]:
+ """Enrich traffic records with location data from measurement points"""
+ try:
+ points_lookup = {point.id: point for point in measurement_points}
+
+ for record in records:
+ point_id = record.get('measurement_point_id')
+ if point_id in points_lookup:
+ point = points_lookup[point_id]
+ record.update({
+ 'measurement_point_name': point.name,
+ 'measurement_point_latitude': point.latitude,
+ 'measurement_point_longitude': point.longitude,
+ 'distance_to_query': point.distance
+ })
+
+ return records
+
+ except Exception as e:
+ logger.warning("Error enriching with location data", error=str(e))
+ return records
+
+ # ================================================================
+ # HTTP CLIENT METHODS
+ # ================================================================
async def _fetch_xml_content_robust(self, url: str) -> Optional[str]:
"""Fetch XML content with robust headers for Madrid endpoints"""
try:
import httpx
- # Headers optimized for Madrid Open Data
headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/xml,text/xml,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
@@ -345,12 +846,7 @@ class MadridOpenDataClient(BaseAPIClient):
'Referer': 'https://datos.madrid.es/'
}
- async with httpx.AsyncClient(
- timeout=30.0,
- follow_redirects=True,
- headers=headers
- ) as client:
-
+ async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, headers=headers) as client:
logger.debug("Fetching XML from Madrid endpoint", url=url)
response = await client.get(url)
@@ -360,20 +856,9 @@ class MadridOpenDataClient(BaseAPIClient):
content_length=len(response.content))
if response.status_code == 200:
- try:
- content = response.text
- if content and len(content) > 100:
- return content
- except UnicodeDecodeError:
- # Try manual encoding for Spanish content
- for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
- try:
- content = response.content.decode(encoding)
- if content and len(content) > 100:
- logger.debug("Successfully decoded with encoding", encoding=encoding)
- return content
- except UnicodeDecodeError:
- continue
+ content = self._decode_response_content(response)
+ if content and len(content) > 100:
+ return content
return None
@@ -381,50 +866,158 @@ class MadridOpenDataClient(BaseAPIClient):
logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e))
return None
+ async def _fetch_historical_zip(self, url: str) -> Optional[bytes]:
+ """Fetch historical ZIP data from Madrid Open Data"""
+ try:
+ import httpx
+
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
+ 'Accept': 'application/zip,application/octet-stream,*/*',
+ 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
+ }
+
+ async with httpx.AsyncClient(timeout=120.0, headers=headers) as client:
+ logger.debug("Fetching historical ZIP", url=url)
+ response = await client.get(url)
+
+ if response.status_code == 200:
+ content = response.content
+ if content and len(content) > 1000:
+ logger.debug("Successfully fetched ZIP", url=url, size=len(content))
+ return content
+ else:
+ logger.debug("ZIP file too small", url=url, size=len(content) if content else 0)
+ else:
+ logger.debug("ZIP not found", url=url, status=response.status_code)
+
+ except Exception as e:
+ logger.debug("Error fetching ZIP", url=url, error=str(e))
+
+ return None
+
+ async def _fetch_measurement_points_csv(self, url: str) -> Optional[str]:
+ """Fetch the measurement points CSV file"""
+ try:
+ import httpx
+
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
+ 'Accept': 'text/csv,application/csv,text/plain,*/*',
+ 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
+ }
+
+ async with httpx.AsyncClient(timeout=30.0, headers=headers) as client:
+ logger.debug("Fetching measurement points CSV", url=url)
+ response = await client.get(url)
+
+ if response.status_code == 200:
+ content = response.text
+ if content and len(content) > 1000:
+ logger.debug("Successfully fetched measurement points CSV",
+ url=url, size=len(content))
+ return content
+ else:
+ logger.debug("Measurement points CSV too small", size=len(content))
+ else:
+ logger.debug("Measurement points CSV not found",
+ url=url, status=response.status_code)
+
+ except Exception as e:
+ logger.debug("Error fetching measurement points CSV", url=url, error=str(e))
+
+ return None
+
+ def _decode_response_content(self, response) -> Optional[str]:
+ """Decode response content with multiple encoding attempts"""
+ try:
+ return response.text
+ except UnicodeDecodeError:
+ # Try manual encoding for Spanish content
+ for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
+ try:
+ content = response.content.decode(encoding)
+ if content and len(content) > 100:
+ logger.debug("Successfully decoded with encoding", encoding=encoding)
+ return content
+ except UnicodeDecodeError:
+ continue
+ return None
+
+ def _extract_csv_from_zip(self, zip_file, csv_filename: str) -> Optional[str]:
+ """Extract and decode CSV content from ZIP file"""
+ try:
+ csv_bytes = zip_file.read(csv_filename)
+
+ # Try different encodings for Spanish content
+ for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
+ try:
+ csv_content = csv_bytes.decode(encoding)
+ logger.debug("Successfully decoded CSV", filename=csv_filename, encoding=encoding)
+ return csv_content
+ except UnicodeDecodeError:
+ continue
+
+ logger.warning("Could not decode CSV file", filename=csv_filename)
+ return None
+
+ except Exception as e:
+ logger.warning("Error extracting CSV from ZIP", filename=csv_filename, error=str(e))
+ return None
+
+ # ================================================================
+ # XML PROCESSING METHODS
+ # ================================================================
+
+ def _clean_madrid_xml(self, xml_content: str) -> str:
+ """Clean Madrid XML to handle undefined entities and encoding issues"""
+ try:
+ # Remove BOM if present
+ xml_content = xml_content.lstrip('\ufeff')
+
+ # Replace undefined entities
+ entity_replacements = {
+ ' ': ' ', '©': '©', '®': '®', '™': '™'
+ }
+
+ for entity, replacement in entity_replacements.items():
+ xml_content = xml_content.replace(entity, replacement)
+
+ # Fix unescaped ampersands
+ xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content)
+
+ # Remove invalid control characters
+ xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content)
+
+ # Handle Spanish characters
+ spanish_chars = {
+ 'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
+ 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U'
+ }
+
+ for spanish_char, replacement in spanish_chars.items():
+ xml_content = xml_content.replace(spanish_char, replacement)
+
+ return xml_content
+
+ except Exception as e:
+ logger.warning("Error cleaning Madrid XML", error=str(e))
+ return xml_content
+
def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]:
"""Extract traffic data using regex when XML parsing fails"""
traffic_points = []
try:
- # Pattern to match Madrid PM elements
pm_pattern = r'(.*?)'
pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL)
for pm_content in pm_matches:
try:
- # Extract individual fields
- idelem_match = re.search(r'(.*?)', pm_content)
- intensidad_match = re.search(r'(.*?)', pm_content)
- st_x_match = re.search(r'(.*?)', pm_content)
- st_y_match = re.search(r'(.*?)', pm_content)
- descripcion_match = re.search(r'(.*?)', pm_content)
-
- if idelem_match and st_x_match and st_y_match:
- idelem = idelem_match.group(1)
- st_x = st_x_match.group(1)
- st_y = st_y_match.group(1)
- intensidad = intensidad_match.group(1) if intensidad_match else '0'
- descripcion = descripcion_match.group(1) if descripcion_match else f'Point {idelem}'
+ extracted_data = self._extract_pm_data_regex(pm_content)
+ if extracted_data and self._is_valid_traffic_point(extracted_data):
+ traffic_points.append(extracted_data)
- # Convert coordinates
- longitude = self._convert_utm_to_lon(st_x)
- latitude = self._convert_utm_to_lat(st_y)
-
- if latitude and longitude:
- traffic_point = {
- 'idelem': idelem,
- 'descripcion': descripcion,
- 'intensidad': self._safe_int(intensidad),
- 'latitude': latitude,
- 'longitude': longitude,
- 'ocupacion': 0,
- 'carga': 0,
- 'nivelServicio': 0,
- 'error': 'N'
- }
-
- traffic_points.append(traffic_point)
-
except Exception as e:
logger.debug("Error parsing regex PM match", error=str(e))
continue
@@ -436,23 +1029,46 @@ class MadridOpenDataClient(BaseAPIClient):
logger.error("Error in regex extraction", error=str(e))
return []
- def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float:
- """Get distance to closest traffic point for debugging"""
- if not traffic_data:
- return float('inf')
+ def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]:
+ """Extract individual PM data using regex"""
+ patterns = {
+ 'idelem': r'(.*?)',
+ 'intensidad': r'(.*?)',
+ 'st_x': r'(.*?)',
+ 'st_y': r'(.*?)',
+ 'descripcion': r'(.*?)'
+ }
- min_distance = float('inf')
- for point in traffic_data:
- if point.get('latitude') and point.get('longitude'):
- distance = self._calculate_distance(
- latitude, longitude,
- point['latitude'], point['longitude']
- )
- min_distance = min(min_distance, distance)
+ extracted = {}
+ for field, pattern in patterns.items():
+ match = re.search(pattern, pm_content)
+ extracted[field] = match.group(1) if match else ''
- return min_distance
+ if extracted['idelem'] and extracted['st_x'] and extracted['st_y']:
+ # Convert coordinates
+ latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y'])
+
+ if latitude and longitude:
+ return {
+ 'idelem': extracted['idelem'],
+ 'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}",
+ 'intensidad': self._safe_int(extracted['intensidad']),
+ 'latitude': latitude,
+ 'longitude': longitude,
+ 'ocupacion': 0,
+ 'carga': 0,
+ 'nivelServicio': 0,
+ 'error': 'N'
+ }
+
+ return {}
- def _find_nearest_traffic_point(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> Optional[Dict]:
+ # ================================================================
+ # TRAFFIC ANALYSIS METHODS
+ # ================================================================
+
+ def _find_nearest_traffic_point(self, latitude: float, longitude: float,
+ traffic_data: List[Dict]) -> Optional[Dict]:
"""Find the nearest traffic measurement point to given coordinates"""
if not traffic_data:
return None
@@ -480,60 +1096,50 @@ class MadridOpenDataClient(BaseAPIClient):
return nearest_point
logger.debug("No nearby Madrid traffic points found",
- min_distance=min_distance,
- total_points=len(traffic_data))
+ min_distance=min_distance, total_points=len(traffic_data))
return None
- def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
- """Calculate distance between two coordinates in km using Haversine formula"""
- R = 6371 # Earth's radius in km
+ def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float:
+ """Get distance to closest traffic point for debugging"""
+ if not traffic_data:
+ return float('inf')
- dlat = math.radians(lat2 - lat1)
- dlon = math.radians(lon2 - lon1)
+ min_distance = float('inf')
+ for point in traffic_data:
+ if point.get('latitude') and point.get('longitude'):
+ distance = self._calculate_distance(
+ latitude, longitude,
+ point['latitude'], point['longitude']
+ )
+ min_distance = min(min_distance, distance)
- a = (math.sin(dlat/2) * math.sin(dlat/2) +
- math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
- math.sin(dlon/2) * math.sin(dlon/2))
-
- c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
- distance = R * c
-
- return distance
+ return min_distance
def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]:
"""Parse Madrid traffic measurement into standardized format"""
try:
- # Madrid traffic service levels: 0=fluid, 1=dense, 2=congested, 3=cut
- service_level_map = {
- 0: "low",
- 1: "medium",
- 2: "high",
- 3: "blocked"
+ service_level = traffic_point.get('nivelServicio', 0)
+ congestion_mapping = {
+ TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value,
+ TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value,
+ TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value,
+ TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value
}
- service_level = traffic_point.get('nivelServicio', 0)
+ # Speed estimation based on service level
+ speed_mapping = {
+ TrafficServiceLevel.FLUID.value: 45,
+ TrafficServiceLevel.DENSE.value: 25,
+ TrafficServiceLevel.CONGESTED.value: 15,
+ TrafficServiceLevel.BLOCKED.value: 5
+ }
- # Estimate speed based on service level and road type
- if service_level == 0: # Fluid
- average_speed = 45
- elif service_level == 1: # Dense
- average_speed = 25
- elif service_level == 2: # Congested
- average_speed = 15
- else: # Cut/Blocked
- average_speed = 5
+ congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value)
+ average_speed = speed_mapping.get(service_level, 25)
- congestion_level = service_level_map.get(service_level, "medium")
-
- # Calculate pedestrian estimate based on location
+ # Calculate pedestrian estimate
hour = datetime.now().hour
- if 13 <= hour <= 15: # Lunch time
- pedestrian_multiplier = 2.5
- elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
- pedestrian_multiplier = 2.0
- else:
- pedestrian_multiplier = 1.0
-
+ pedestrian_multiplier = self._get_pedestrian_multiplier(hour)
pedestrian_count = int(100 * pedestrian_multiplier)
return {
@@ -547,28 +1153,25 @@ class MadridOpenDataClient(BaseAPIClient):
"measurement_point_id": traffic_point.get('idelem'),
"measurement_point_name": traffic_point.get('descripcion'),
"road_type": "URB",
- "source": "madrid_opendata"
+ "source": DataSource.MADRID_REALTIME.value
}
except Exception as e:
logger.error("Error parsing traffic measurement", error=str(e))
return self._get_default_traffic_data()
- def _get_default_traffic_data(self) -> Dict[str, Any]:
- """Get default traffic data when parsing fails"""
- return {
- "date": datetime.now(),
- "traffic_volume": 100,
- "pedestrian_count": 150,
- "congestion_level": "medium",
- "average_speed": 25,
- "occupation_percentage": 30,
- "load_percentage": 40,
- "measurement_point_id": "unknown",
- "measurement_point_name": "Unknown location",
- "road_type": "URB",
- "source": "synthetic"
- }
+ def _get_pedestrian_multiplier(self, hour: int) -> float:
+ """Get pedestrian multiplier based on time of day"""
+ if 13 <= hour <= 15: # Lunch time
+ return 2.5
+ elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
+ return 2.0
+ else:
+ return 1.0
+
+ # ================================================================
+ # SYNTHETIC DATA GENERATION METHODS
+ # ================================================================
async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]:
"""Generate realistic Madrid traffic data as fallback"""
@@ -576,338 +1179,25 @@ class MadridOpenDataClient(BaseAPIClient):
hour = now.hour
is_weekend = now.weekday() >= 5
- base_traffic = 100
-
- if not is_weekend:
- if 7 <= hour <= 9:
- traffic_multiplier = 2.2
- congestion = "high"
- avg_speed = 15
- elif 18 <= hour <= 20:
- traffic_multiplier = 2.5
- congestion = "high"
- avg_speed = 12
- elif 12 <= hour <= 14:
- traffic_multiplier = 1.6
- congestion = "medium"
- avg_speed = 25
- else:
- traffic_multiplier = 1.0
- congestion = "low"
- avg_speed = 40
- else:
- if 11 <= hour <= 14:
- traffic_multiplier = 1.4
- congestion = "medium"
- avg_speed = 30
- else:
- traffic_multiplier = 0.8
- congestion = "low"
- avg_speed = 45
-
- traffic_volume = int(base_traffic * traffic_multiplier)
-
- # Pedestrian calculation
- pedestrian_base = 150
- if 13 <= hour <= 15:
- pedestrian_count = int(pedestrian_base * 2.5)
- elif 8 <= hour <= 9 or 18 <= hour <= 20:
- pedestrian_count = int(pedestrian_base * 2.0)
- else:
- pedestrian_count = int(pedestrian_base * 1.0)
+ traffic_params = self._calculate_traffic_parameters(hour, is_weekend)
return {
"date": now,
- "traffic_volume": traffic_volume,
- "pedestrian_count": pedestrian_count,
- "congestion_level": congestion,
- "average_speed": max(10, avg_speed),
- "occupation_percentage": min(100, traffic_volume // 2),
- "load_percentage": min(100, traffic_volume // 3),
+ "traffic_volume": traffic_params['volume'],
+ "pedestrian_count": traffic_params['pedestrians'],
+ "congestion_level": traffic_params['congestion'],
+ "average_speed": traffic_params['speed'],
+ "occupation_percentage": min(100, traffic_params['volume'] // 2),
+ "load_percentage": min(100, traffic_params['volume'] // 3),
"measurement_point_id": "madrid_synthetic",
"measurement_point_name": "Madrid Centro (Synthetic)",
"road_type": "URB",
- "source": "synthetic"
+ "source": DataSource.SYNTHETIC.value
}
- async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
- """Get historical traffic data from Madrid Open Data
-
- Args:
- latitude: Location latitude
- longitude: Location longitude
- start_date: Start date for historical data
- end_date: End date for historical data
-
- Returns:
- List of historical traffic data dictionaries
- """
- try:
- logger.debug("Fetching Madrid historical traffic data",
- lat=latitude, lon=longitude,
- start=start_date, end=end_date)
-
- historical_data = []
-
- # Generate historical data using synthetic generation for periods before API availability
- # or when real data is not available
- if (end_date - start_date).days <= 90: # Reasonable range for synthetic data
- historical_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date)
- logger.info("Generated synthetic historical traffic data",
- records=len(historical_data))
- else:
- logger.warning("Date range too large for historical traffic data",
- days=(end_date - start_date).days)
- return []
-
- # Try to fetch real data if API key is available and for recent dates
- if hasattr(self, 'api_key') and self.api_key:
- try:
- real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date)
- if real_data:
- # Merge real data with synthetic data or replace synthetic data
- historical_data = real_data
- logger.info("Fetched real historical traffic data",
- records=len(real_data))
- except Exception as e:
- logger.warning("Failed to fetch real historical data, using synthetic", error=str(e))
-
- return historical_data
-
- except Exception as e:
- logger.error("Error getting historical traffic data", error=str(e))
- return []
-
- async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
- """Fetch real historical traffic data from Madrid Open Data portal
-
- Madrid provides historical CSV files by month at:
- https://datos.madrid.es/egob/catalogo/[ID]-[YEAR]-[MONTH]-trafico-historico.csv
- """
- try:
- historical_data = []
- current_date = start_date.replace(day=1) # Start from beginning of month
-
- while current_date <= end_date:
- try:
- # Madrid historical traffic CSV URL pattern
- year = current_date.year
- month = current_date.month
-
- # Try different URL patterns based on Madrid Open Data structure
- historical_urls = [
- f"https://datos.madrid.es/egob/catalogo/300217-{year}-{month:02d}-trafico-historico.csv",
- f"https://datos.madrid.es/egob/catalogo/trafico-historico-{year}-{month:02d}.csv",
- f"https://datos.madrid.es/egob/catalogo/{year}{month:02d}-trafico-historico.csv"
- ]
-
- for url in historical_urls:
- csv_data = await self._fetch_historical_csv(url)
- if csv_data:
- # Parse CSV and filter by location
- month_data = await self._parse_historical_csv(csv_data, latitude, longitude, start_date, end_date)
- historical_data.extend(month_data)
- logger.debug("Fetched historical data for month",
- year=year, month=month, records=len(month_data))
- break
-
- # Move to next month
- if current_date.month == 12:
- current_date = current_date.replace(year=current_date.year + 1, month=1)
- else:
- current_date = current_date.replace(month=current_date.month + 1)
-
- except Exception as e:
- logger.warning("Error fetching data for month",
- year=current_date.year, month=current_date.month, error=str(e))
- # Move to next month even on error
- if current_date.month == 12:
- current_date = current_date.replace(year=current_date.year + 1, month=1)
- else:
- current_date = current_date.replace(month=current_date.month + 1)
-
- return historical_data
-
- except Exception as e:
- logger.error("Error fetching real historical traffic data", error=str(e))
- return []
-
- async def _fetch_historical_csv(self, url: str) -> Optional[str]:
- """Fetch historical CSV data from Madrid Open Data"""
- try:
- import httpx
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
- 'Accept': 'text/csv,application/csv,text/plain,*/*',
- 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
- }
-
- async with httpx.AsyncClient(timeout=60.0, headers=headers) as client:
- logger.debug("Fetching historical CSV", url=url)
- response = await client.get(url)
-
- if response.status_code == 200:
- content = response.text
- if content and len(content) > 100: # Ensure we got actual data
- logger.debug("Successfully fetched CSV",
- url=url, size=len(content))
- return content
- else:
- logger.debug("CSV not found", url=url, status=response.status_code)
-
- except Exception as e:
- logger.debug("Error fetching CSV", url=url, error=str(e))
-
- return None
-
- async def _parse_historical_csv(self, csv_content: str, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
- """Parse Madrid historical traffic CSV and filter by location and date range"""
- try:
- import csv
- from io import StringIO
-
- historical_records = []
- csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
-
- # Get the nearest measurement points to our coordinates
- measurement_points = await self._get_measurement_points_near_location(latitude, longitude)
- target_point_ids = [point['id'] for point in measurement_points[:3]] # Use 3 nearest points
-
- for row in csv_reader:
- try:
- # Parse Madrid CSV format
- # Expected columns: fecha, hora, idelem, intensidad, ocupacion, carga, nivelServicio, etc.
-
- # Extract date and time
- if 'fecha' in row and 'hora' in row:
- date_str = row.get('fecha', '').strip()
- time_str = row.get('hora', '').strip()
-
- # Parse Madrid date format (usually DD/MM/YYYY)
- if date_str and time_str:
- try:
- # Try different date formats
- for date_format in ['%d/%m/%Y', '%Y-%m-%d', '%d-%m-%Y']:
- try:
- record_date = datetime.strptime(f"{date_str} {time_str}", f"{date_format} %H:%M")
- break
- except ValueError:
- continue
- else:
- continue # Skip if no date format worked
-
- # Check if record is in our date range
- if not (start_date <= record_date <= end_date):
- continue
-
- except ValueError:
- continue
- else:
- continue
-
- # Check if this record is from a measurement point near our location
- point_id = row.get('idelem', '').strip()
- if point_id not in target_point_ids:
- continue
-
- # Parse traffic data
- traffic_record = {
- "date": record_date,
- "traffic_volume": self._safe_int(row.get('intensidad', '0')),
- "occupation_percentage": self._safe_int(row.get('ocupacion', '0')),
- "load_percentage": self._safe_int(row.get('carga', '0')),
- "service_level": self._safe_int(row.get('nivelServicio', '0')),
- "measurement_point_id": point_id,
- "measurement_point_name": row.get('descripcion', f'Point {point_id}'),
- "road_type": row.get('tipo_elem', 'URB'),
- "source": "madrid_opendata_historical"
- }
-
- # Calculate derived metrics
- service_level = traffic_record['service_level']
- if service_level == 0: # Fluid
- congestion_level = "low"
- avg_speed = 45
- pedestrian_multiplier = 1.0
- elif service_level == 1: # Dense
- congestion_level = "medium"
- avg_speed = 25
- pedestrian_multiplier = 1.5
- elif service_level == 2: # Congested
- congestion_level = "high"
- avg_speed = 15
- pedestrian_multiplier = 2.0
- else: # Cut/Blocked
- congestion_level = "blocked"
- avg_speed = 5
- pedestrian_multiplier = 0.5
-
- traffic_record.update({
- "congestion_level": congestion_level,
- "average_speed": avg_speed,
- "pedestrian_count": int(100 * pedestrian_multiplier)
- })
-
- historical_records.append(traffic_record)
-
- except Exception as e:
- logger.debug("Error parsing CSV row", error=str(e))
- continue
-
- return historical_records
-
- except Exception as e:
- logger.error("Error parsing historical CSV", error=str(e))
- return []
-
- async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[Dict[str, Any]]:
- """Get measurement points near the specified location"""
- try:
- # Try to fetch current traffic data to get measurement points
- current_traffic = await self._fetch_traffic_xml_data(self.traffic_endpoints[0])
-
- if current_traffic:
- # Calculate distances and sort by proximity
- points_with_distance = []
- for point in current_traffic:
- if point.get('latitude') and point.get('longitude'):
- distance = self._calculate_distance(
- latitude, longitude,
- point['latitude'], point['longitude']
- )
- points_with_distance.append({
- 'id': point.get('idelem'),
- 'distance': distance,
- 'latitude': point['latitude'],
- 'longitude': point['longitude'],
- 'name': point.get('descripcion', '')
- })
-
- # Sort by distance and return closest points
- points_with_distance.sort(key=lambda x: x['distance'])
- return points_with_distance[:5] # Return 5 closest points
-
- # Fallback: return synthetic point IDs based on Madrid geography
- return [
- {'id': 'madrid_centro_01', 'distance': 1.0},
- {'id': 'madrid_centro_02', 'distance': 2.0},
- {'id': 'madrid_centro_03', 'distance': 3.0}
- ]
-
- except Exception as e:
- logger.warning("Error getting measurement points", error=str(e))
- return [{'id': 'madrid_default', 'distance': 0.0}]
-
- async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
- """Generate synthetic historical traffic data for the specified period
-
- This method creates realistic historical traffic patterns based on:
- - Time of day patterns
- - Day of week patterns
- - Seasonal variations
- - Random variations for realism
- """
+ async def _generate_historical_traffic(self, latitude: float, longitude: float,
+ start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
+ """Generate synthetic historical traffic data with realistic patterns"""
try:
import random
from datetime import timedelta
@@ -915,95 +1205,54 @@ class MadridOpenDataClient(BaseAPIClient):
historical_data = []
current_date = start_date
- # Seed random for consistent but varied data
+ # Seed random for consistent data
random.seed(hash(f"{latitude}{longitude}"))
- while current_date <= end_date:
- # Generate 24 hourly records for each day
- for hour in range(24):
+ while current_date < end_date:
+ # Calculate how many hours to generate for this day
+ if current_date.date() == end_date.date():
+ # Same day as end_date, only generate up to end_date hour
+ end_hour = end_date.hour
+ else:
+ # Full day
+ end_hour = 24
+
+ # Generate hourly records for this day
+ for hour in range(current_date.hour if current_date == start_date else 0, end_hour):
record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0)
- # Base traffic calculation
- base_traffic = 100
- hour_of_day = record_time.hour
- day_of_week = record_time.weekday() # 0=Monday, 6=Sunday
- month = record_time.month
+ # Skip if record time is at or beyond end_date
+ if record_time >= end_date:
+ break
- # Time of day patterns
- if 7 <= hour_of_day <= 9: # Morning rush
- traffic_multiplier = 2.2 + random.uniform(-0.3, 0.3)
- congestion = "high"
- avg_speed = 15 + random.randint(-5, 5)
- elif 18 <= hour_of_day <= 20: # Evening rush
- traffic_multiplier = 2.5 + random.uniform(-0.4, 0.4)
- congestion = "high"
- avg_speed = 12 + random.randint(-3, 8)
- elif 12 <= hour_of_day <= 14: # Lunch time
- traffic_multiplier = 1.6 + random.uniform(-0.2, 0.2)
- congestion = "medium"
- avg_speed = 25 + random.randint(-5, 10)
- elif 22 <= hour_of_day or hour_of_day <= 6: # Night
- traffic_multiplier = 0.3 + random.uniform(-0.1, 0.2)
- congestion = "low"
- avg_speed = 50 + random.randint(-10, 15)
- else: # Regular hours
- traffic_multiplier = 1.0 + random.uniform(-0.2, 0.2)
- congestion = "medium"
- avg_speed = 35 + random.randint(-10, 10)
+ traffic_params = self._generate_synthetic_traffic_params(record_time, random)
- # Weekend adjustments
- if day_of_week >= 5: # Weekend
- if hour_of_day in [11, 12, 13, 14, 15]: # Weekend afternoon peak
- traffic_multiplier *= 1.4
- congestion = "medium"
- else:
- traffic_multiplier *= 0.7
- if congestion == "high":
- congestion = "medium"
-
- # Seasonal adjustments
- if month in [7, 8]: # Summer - less traffic due to vacations
- traffic_multiplier *= 0.8
- elif month in [11, 12]: # Holiday season - more traffic
- traffic_multiplier *= 1.1
-
- # Calculate final values
- traffic_volume = max(10, int(base_traffic * traffic_multiplier))
- avg_speed = max(10, min(60, avg_speed))
-
- # Pedestrian calculation
- pedestrian_base = 150
- if 13 <= hour_of_day <= 15: # Lunch time
- pedestrian_count = int(pedestrian_base * 2.5 * random.uniform(0.8, 1.2))
- elif 8 <= hour_of_day <= 9 or 18 <= hour_of_day <= 20: # Rush hours
- pedestrian_count = int(pedestrian_base * 2.0 * random.uniform(0.8, 1.2))
- else:
- pedestrian_count = int(pedestrian_base * 1.0 * random.uniform(0.5, 1.5))
-
- # Create traffic record
traffic_record = {
"date": record_time,
- "traffic_volume": traffic_volume,
- "pedestrian_count": pedestrian_count,
- "congestion_level": congestion,
- "average_speed": avg_speed,
- "occupation_percentage": min(100, traffic_volume // 2),
- "load_percentage": min(100, traffic_volume // 3),
+ "traffic_volume": traffic_params['volume'],
+ "pedestrian_count": traffic_params['pedestrians'],
+ "congestion_level": traffic_params['congestion'],
+ "average_speed": traffic_params['speed'],
+ "occupation_percentage": min(100, traffic_params['volume'] // 2),
+ "load_percentage": min(100, traffic_params['volume'] // 3),
"measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}",
"measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})",
"road_type": "URB",
- "source": "synthetic_historical"
+ "source": DataSource.SYNTHETIC_HISTORICAL.value
}
historical_data.append(traffic_record)
# Move to next day
- current_date += timedelta(days=1)
+ if current_date.date() == end_date.date():
+ # We've processed the end date, stop
+ break
+ else:
+ # Move to start of next day
+ current_date = (current_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
logger.info("Generated historical traffic data",
- records=len(historical_data),
- start=start_date,
- end=end_date)
+ records=len(historical_data), start=start_date, end=end_date)
return historical_data
@@ -1011,6 +1260,132 @@ class MadridOpenDataClient(BaseAPIClient):
logger.error("Error generating historical traffic data", error=str(e))
return []
- async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
- """Get traffic incidents and events"""
- return []
\ No newline at end of file
+ def _calculate_traffic_parameters(self, hour: int, is_weekend: bool) -> Dict[str, Any]:
+ """Calculate traffic parameters based on time and day type"""
+ base_traffic = 100
+
+ if not is_weekend:
+ if 7 <= hour <= 9:
+ multiplier, congestion, speed = 2.2, "high", 15
+ elif 18 <= hour <= 20:
+ multiplier, congestion, speed = 2.5, "high", 12
+ elif 12 <= hour <= 14:
+ multiplier, congestion, speed = 1.6, "medium", 25
+ else:
+ multiplier, congestion, speed = 1.0, "low", 40
+ else:
+ if 11 <= hour <= 14:
+ multiplier, congestion, speed = 1.4, "medium", 30
+ else:
+ multiplier, congestion, speed = 0.8, "low", 45
+
+ volume = int(base_traffic * multiplier)
+ pedestrians = int(150 * self._get_pedestrian_multiplier(hour))
+
+ return {
+ 'volume': volume,
+ 'congestion': congestion,
+ 'speed': max(10, speed),
+ 'pedestrians': pedestrians
+ }
+
+ def _generate_synthetic_traffic_params(self, record_time: datetime, random_gen) -> Dict[str, Any]:
+ """Generate synthetic traffic parameters with random variations"""
+ hour = record_time.hour
+ day_of_week = record_time.weekday()
+ month = record_time.month
+
+ base_params = self._calculate_traffic_parameters(hour, day_of_week >= 5)
+
+ # Add random variations
+ volume_variation = random_gen.uniform(-0.3, 0.3)
+ speed_variation = random_gen.randint(-5, 5)
+
+ # Apply seasonal adjustments
+ seasonal_multiplier = 0.8 if month in [7, 8] else (1.1 if month in [11, 12] else 1.0)
+
+ # Weekend specific adjustments
+ if day_of_week >= 5 and hour in [11, 12, 13, 14, 15]:
+ base_params['volume'] = int(base_params['volume'] * 1.4)
+ base_params['congestion'] = "medium"
+
+ return {
+ 'volume': max(10, int(base_params['volume'] * (1 + volume_variation) * seasonal_multiplier)),
+ 'congestion': base_params['congestion'],
+ 'speed': max(10, min(60, base_params['speed'] + speed_variation)),
+ 'pedestrians': int(base_params['pedestrians'] * random_gen.uniform(0.8, 1.2))
+ }
+
+ def _get_fallback_measurement_points(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
+ """Generate fallback measurement points when CSV is not available"""
+ madrid_points = [
+ (40.4168, -3.7038, "Madrid Centro"),
+ (40.4200, -3.7060, "Gran Vía"),
+ (40.4155, -3.7074, "Plaza Mayor"),
+ (40.4152, -3.6844, "Retiro"),
+ (40.4063, -3.6932, "Atocha"),
+ ]
+
+ fallback_points = []
+ for i, (lat, lon, name) in enumerate(madrid_points):
+ distance = self._calculate_distance(latitude, longitude, lat, lon)
+ point = MeasurementPoint(
+ id=f'fallback_{i+1000}',
+ latitude=lat,
+ longitude=lon,
+ distance=distance,
+ name=name,
+ type='URB'
+ )
+ fallback_points.append(point)
+
+ fallback_points.sort(key=lambda x: x.distance)
+ return fallback_points[:5]
+
+ def _get_default_traffic_data(self) -> Dict[str, Any]:
+ """Get default traffic data when parsing fails"""
+ return {
+ "date": datetime.now(),
+ "traffic_volume": 100,
+ "pedestrian_count": 150,
+ "congestion_level": CongestionLevel.MEDIUM.value,
+ "average_speed": 25,
+ "occupation_percentage": 30,
+ "load_percentage": 40,
+ "measurement_point_id": "unknown",
+ "measurement_point_name": "Unknown location",
+ "road_type": "URB",
+ "source": DataSource.SYNTHETIC.value
+ }
+
+ # ================================================================
+ # CORE UTILITY METHODS
+ # ================================================================
+
+ def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
+ """Calculate distance between two coordinates using Haversine formula"""
+ R = 6371 # Earth's radius in km
+
+ dlat = math.radians(lat2 - lat1)
+ dlon = math.radians(lon2 - lon1)
+
+ a = (math.sin(dlat/2) * math.sin(dlat/2) +
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
+ math.sin(dlon/2) * math.sin(dlon/2))
+
+ c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
+ return R * c
+
+ def _safe_int(self, value_str: str) -> int:
+ """Safely convert string to int"""
+ try:
+ return int(float(value_str.replace(',', '.')))
+ except (ValueError, TypeError):
+ return 0
+
+ def _safe_float(self, value_str: str) -> float:
+ """Safely convert string to float"""
+ try:
+ return float(value_str.replace(',', '.'))
+ except (ValueError, TypeError):
+ return 0.0
\ No newline at end of file
diff --git a/services/data/tests/conftest.py b/services/data/tests/conftest.py
index 578ac2c5..f506bf4e 100644
--- a/services/data/tests/conftest.py
+++ b/services/data/tests/conftest.py
@@ -16,67 +16,3 @@ from app.core.database import Base, get_db
from app.models.sales import SalesData
from app.models.weather import WeatherData, WeatherForecast
from app.models.traffic import TrafficData
-
-# Test database URL
-TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
-
-# Create test engine
-test_engine = create_async_engine(
- TEST_DATABASE_URL,
- connect_args={"check_same_thread": False},
- poolclass=StaticPool,
-)
-
-TestingSessionLocal = async_sessionmaker(
- test_engine,
- class_=AsyncSession,
- expire_on_commit=False
-)
-
-@pytest_asyncio.fixture
-async def db():
- """Create test database session"""
- async with test_engine.begin() as conn:
- await conn.run_sync(Base.metadata.create_all)
-
- async with TestingSessionLocal() as session:
- yield session
-
- async with test_engine.begin() as conn:
- await conn.run_sync(Base.metadata.drop_all)
-
-@pytest.fixture
-def client():
- """Create test client"""
- async def override_get_db():
- async with TestingSessionLocal() as session:
- yield session
-
- app.dependency_overrides[get_db] = override_get_db
-
- with TestClient(app) as test_client:
- yield test_client
-
- app.dependency_overrides.clear()
-
-@pytest.fixture
-def test_tenant_id():
- """Test tenant ID"""
- return uuid.uuid4()
-
-@pytest.fixture
-def test_sales_data():
- """Sample sales data for testing"""
- return {
- "date": datetime.now(),
- "product_name": "Pan Integral",
- "quantity_sold": 25,
- "revenue": 37.50,
- "location_id": "madrid_centro",
- "source": "test"
- }
-
-@pytest.fixture
-def mock_auth_token():
- """Mock authentication token"""
- return "Bearer test-token-123"
\ No newline at end of file
diff --git a/services/data/tests/pytest.ini b/services/data/tests/pytest.ini
new file mode 100644
index 00000000..4bd8c845
--- /dev/null
+++ b/services/data/tests/pytest.ini
@@ -0,0 +1,16 @@
+[tool:pytest]
+# pytest.ini - Configuration for async testing
+asyncio_mode = auto
+addopts = -v --tb=short --capture=no
+testpaths = tests
+python_files = test_*.py
+python_classes = Test*
+python_functions = test_*
+markers =
+ asyncio: mark test as async
+ slow: mark test as slow
+ integration: mark test as integration test
+filterwarnings =
+ ignore::DeprecationWarning
+ ignore::PendingDeprecationWarning
+ ignore::PydanticDeprecatedSince20
\ No newline at end of file
diff --git a/services/data/tests/test_madrid_opendata.py b/services/data/tests/test_madrid_opendata.py
new file mode 100644
index 00000000..38f017cd
--- /dev/null
+++ b/services/data/tests/test_madrid_opendata.py
@@ -0,0 +1,405 @@
+#!/usr/bin/env python3
+"""
+Updated Madrid Historical Traffic test for pytest inside Docker
+Configured for June 2025 data availability (last available historical data)
+"""
+
+import pytest
+import asyncio
+from datetime import datetime, timedelta
+from typing import List, Dict, Any
+
+# Import from the actual service
+from app.external.madrid_opendata import MadridOpenDataClient
+from app.core.config import settings
+import structlog
+
+# Configure pytest for async
+pytestmark = pytest.mark.asyncio
+
+# Use actual logger
+logger = structlog.get_logger()
+
+
+class TestMadridTrafficInside:
+ """Test class for Madrid traffic functionality inside Docker"""
+
+ @pytest.fixture
+ def client(self):
+ """Create Madrid client for testing"""
+ return MadridOpenDataClient()
+
+ @pytest.fixture
+ def madrid_coords(self):
+ """Madrid center coordinates"""
+ return 40.4168, -3.7038
+
+ @pytest.fixture
+ def june_2025_dates(self):
+ """Date ranges for June 2025 (last available historical data)"""
+ return {
+ "quick": {
+ "start": datetime(2025, 6, 1, 0, 0),
+ "end": datetime(2025, 6, 1, 6, 0) # 6 hours on June 1st
+ },
+ "one_day": {
+ "start": datetime(2025, 6, 15, 0, 0), # Mid-June
+ "end": datetime(2025, 6, 16, 0, 0) # One full day
+ },
+ "three_days": {
+ "start": datetime(2025, 6, 10, 0, 0),
+ "end": datetime(2025, 6, 13, 0, 0) # 3 days in June
+ },
+ "recent_synthetic": {
+ "start": datetime.now() - timedelta(hours=6),
+ "end": datetime.now() # Recent data (will be synthetic)
+ }
+ }
+
+ async def test_quick_historical_traffic_june2025(self, client, madrid_coords, june_2025_dates):
+ """Test quick historical traffic data from June 2025"""
+ lat, lon = madrid_coords
+ date_range = june_2025_dates["quick"]
+ start_time = date_range["start"]
+ end_time = date_range["end"]
+
+ print(f"\n=== Quick Test (June 2025 - 6 hours) ===")
+ print(f"Location: {lat}, {lon}")
+ print(f"Date range: {start_time.strftime('%Y-%m-%d %H:%M')} to {end_time.strftime('%Y-%m-%d %H:%M')}")
+ print(f"Note: Testing with June 2025 data (last available historical month)")
+
+ # Test the function
+ execution_start = datetime.now()
+ result = await client.get_historical_traffic(lat, lon, start_time, end_time)
+ execution_time = (datetime.now() - execution_start).total_seconds()
+
+ print(f"⏱️ Execution time: {execution_time:.2f} seconds")
+ print(f"📊 Records returned: {len(result)}")
+
+ # Assertions
+ assert isinstance(result, list), "Result should be a list"
+ assert len(result) > 0, "Should return at least some records"
+ assert execution_time < 30, "Should execute in reasonable time (allowing for ZIP download)"
+
+ # Check first record structure
+ if result:
+ sample = result[0]
+ print(f"📋 Sample record keys: {list(sample.keys())}")
+ print(f"📡 Data source: {sample.get('source', 'unknown')}")
+
+ # Required fields
+ required_fields = ['date', 'traffic_volume', 'congestion_level', 'average_speed', 'source']
+ for field in required_fields:
+ assert field in sample, f"Missing required field: {field}"
+
+ # Data validation
+ assert isinstance(sample['traffic_volume'], int), "Traffic volume should be int"
+ assert 0 <= sample['traffic_volume'] <= 1000, "Traffic volume should be reasonable"
+ assert sample['congestion_level'] in ['low', 'medium', 'high', 'blocked'], "Invalid congestion level"
+ assert 5 <= sample['average_speed'] <= 100, "Speed should be reasonable"
+ assert isinstance(sample['date'], datetime), "Date should be datetime object"
+
+ # Check if we got real Madrid data or synthetic
+ if sample['source'] == 'madrid_opendata_zip':
+ print(f"🎉 SUCCESS: Got real Madrid historical data from ZIP!")
+ else:
+ print(f"ℹ️ Got synthetic data (real data may not be available)")
+
+ print(f"✅ All validations passed")
+
+ async def test_one_day_june2025(self, client, madrid_coords, june_2025_dates):
+ """Test one day of June 2025 historical traffic data"""
+ lat, lon = madrid_coords
+ date_range = june_2025_dates["one_day"]
+ start_time = date_range["start"]
+ end_time = date_range["end"]
+
+ print(f"\n=== One Day Test (June 15, 2025) ===")
+ print(f"Date range: {start_time.strftime('%Y-%m-%d %H:%M')} to {end_time.strftime('%Y-%m-%d %H:%M')}")
+
+ result = await client.get_historical_traffic(lat, lon, start_time, end_time)
+
+ print(f"📊 Records returned: {len(result)}")
+
+ # Should have roughly 24 records (one per hour)
+ assert len(result) >= 20, "Should have at least 20 hourly records for one day"
+ assert len(result) <= 30, "Should not have more than 30 records for one day"
+
+ # Check data source
+ if result:
+ sources = set(r['source'] for r in result)
+ print(f"📡 Data sources: {', '.join(sources)}")
+
+ # If we got real data, check for realistic measurement point IDs
+ real_data_records = [r for r in result if r['source'] == 'madrid_opendata_zip']
+ if real_data_records:
+ point_ids = set(r['measurement_point_id'] for r in real_data_records)
+ print(f"🏷️ Real measurement points found: {len(point_ids)}")
+ print(f" Sample IDs: {list(point_ids)[:3]}")
+
+ # Check traffic patterns
+ if len(result) >= 24:
+ # Find rush hour records (7-9 AM, 6-8 PM)
+ rush_hour_records = [r for r in result if 7 <= r['date'].hour <= 9 or 18 <= r['date'].hour <= 20]
+ night_records = [r for r in result if r['date'].hour <= 6 or r['date'].hour >= 22]
+
+ if rush_hour_records and night_records:
+ avg_rush_traffic = sum(r['traffic_volume'] for r in rush_hour_records) / len(rush_hour_records)
+ avg_night_traffic = sum(r['traffic_volume'] for r in night_records) / len(night_records)
+
+ print(f"📈 Rush hour avg traffic: {avg_rush_traffic:.1f}")
+ print(f"🌙 Night avg traffic: {avg_night_traffic:.1f}")
+
+ # Rush hour should typically have more traffic than night
+ if avg_rush_traffic > avg_night_traffic:
+ print(f"✅ Traffic patterns look realistic")
+ else:
+ print(f"⚠️ Traffic patterns unusual (not necessarily wrong)")
+
+ async def test_three_days_june2025(self, client, madrid_coords, june_2025_dates):
+ """Test three days of June 2025 historical traffic data"""
+ lat, lon = madrid_coords
+ date_range = june_2025_dates["three_days"]
+ start_time = date_range["start"]
+ end_time = date_range["end"]
+
+ print(f"\n=== Three Days Test (June 10-13, 2025) ===")
+ print(f"Date range: {start_time.strftime('%Y-%m-%d')} to {end_time.strftime('%Y-%m-%d')}")
+
+ result = await client.get_historical_traffic(lat, lon, start_time, end_time)
+
+ print(f"📊 Records returned: {len(result)}")
+
+ # Should have roughly 72 records (24 hours * 3 days)
+ assert len(result) >= 60, "Should have at least 60 records for 3 days"
+ assert len(result) <= 90, "Should not have more than 90 records for 3 days"
+
+ # Check data sources
+ sources = set(r['source'] for r in result)
+ print(f"📡 Data sources: {', '.join(sources)}")
+
+ # Calculate statistics
+ traffic_volumes = [r['traffic_volume'] for r in result]
+ speeds = [r['average_speed'] for r in result]
+
+ avg_traffic = sum(traffic_volumes) / len(traffic_volumes)
+ max_traffic = max(traffic_volumes)
+ min_traffic = min(traffic_volumes)
+ avg_speed = sum(speeds) / len(speeds)
+
+ print(f"📈 Statistics:")
+ print(f" Average traffic: {avg_traffic:.1f}")
+ print(f" Max traffic: {max_traffic}")
+ print(f" Min traffic: {min_traffic}")
+ print(f" Average speed: {avg_speed:.1f} km/h")
+
+ # Analyze by data source
+ real_data_records = [r for r in result if r['source'] == 'madrid_opendata_zip']
+ synthetic_records = [r for r in result if r['source'] != 'madrid_opendata_zip']
+
+ print(f"🔍 Data breakdown:")
+ print(f" Real Madrid data: {len(real_data_records)} records")
+ print(f" Synthetic data: {len(synthetic_records)} records")
+
+ if real_data_records:
+ # Show measurement points from real data
+ real_points = set(r['measurement_point_id'] for r in real_data_records)
+ print(f" Real measurement points: {len(real_points)}")
+
+ # Sanity checks
+ assert 10 <= avg_traffic <= 500, "Average traffic should be reasonable"
+ assert 10 <= avg_speed <= 60, "Average speed should be reasonable"
+ assert max_traffic >= avg_traffic, "Max should be >= average"
+ assert min_traffic <= avg_traffic, "Min should be <= average"
+
+ async def test_recent_vs_historical_data(self, client, madrid_coords, june_2025_dates):
+ """Compare recent data (synthetic) vs June 2025 data (potentially real)"""
+ lat, lon = madrid_coords
+
+ print(f"\n=== Recent vs Historical Data Comparison ===")
+
+ # Test recent data (should be synthetic)
+ recent_range = june_2025_dates["recent_synthetic"]
+ recent_result = await client.get_historical_traffic(
+ lat, lon, recent_range["start"], recent_range["end"]
+ )
+
+ # Test June 2025 data (potentially real)
+ june_range = june_2025_dates["quick"]
+ june_result = await client.get_historical_traffic(
+ lat, lon, june_range["start"], june_range["end"]
+ )
+
+ print(f"📊 Recent data: {len(recent_result)} records")
+ print(f"📊 June 2025 data: {len(june_result)} records")
+
+ if recent_result:
+ recent_sources = set(r['source'] for r in recent_result)
+ print(f"📡 Recent sources: {', '.join(recent_sources)}")
+
+ if june_result:
+ june_sources = set(r['source'] for r in june_result)
+ print(f"📡 June sources: {', '.join(june_sources)}")
+
+ # Check if we successfully got real data from June
+ if 'madrid_opendata_zip' in june_sources:
+ print(f"🎉 SUCCESS: Real Madrid data successfully fetched from June 2025!")
+
+ # Show details of real data
+ real_records = [r for r in june_result if r['source'] == 'madrid_opendata_zip']
+ if real_records:
+ sample = real_records[0]
+ print(f"📋 Real data sample:")
+ print(f" Date: {sample['date']}")
+ print(f" Traffic volume: {sample['traffic_volume']}")
+ print(f" Measurement point: {sample['measurement_point_id']}")
+ print(f" Point name: {sample.get('measurement_point_name', 'N/A')}")
+ else:
+ print(f"ℹ️ June data is synthetic (real ZIP may not be accessible)")
+
+ async def test_madrid_zip_month_code(self, client):
+ """Test the month code calculation for Madrid ZIP files"""
+ print(f"\n=== Madrid ZIP Month Code Test ===")
+
+ # Test the month code calculation function
+ test_cases = [
+ (2025, 6, 145), # Known: June 2025 = 145
+ (2025, 5, 144), # Known: May 2025 = 144
+ (2025, 4, 143), # Known: April 2025 = 143
+ (2025, 7, 146), # Predicted: July 2025 = 146
+ ]
+
+ for year, month, expected_code in test_cases:
+ if hasattr(client, '_calculate_madrid_month_code'):
+ calculated_code = client._calculate_madrid_month_code(year, month)
+ status = "✅" if calculated_code == expected_code else "⚠️"
+ print(f"{status} {year}-{month:02d}: Expected {expected_code}, Got {calculated_code}")
+
+ # Generate ZIP URL
+ if calculated_code:
+ zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{calculated_code}-transporte-ptomedida-historico.zip"
+ print(f" ZIP URL: {zip_url}")
+ else:
+ print(f"⚠️ Month code calculation function not available")
+
+ async def test_edge_case_large_date_range(self, client, madrid_coords):
+ """Test edge case: date range too large"""
+ lat, lon = madrid_coords
+ start_time = datetime(2025, 1, 1) # 6+ months range
+ end_time = datetime(2025, 7, 1)
+
+ print(f"\n=== Edge Case: Large Date Range ===")
+ print(f"Testing 6-month range: {start_time.date()} to {end_time.date()}")
+
+ result = await client.get_historical_traffic(lat, lon, start_time, end_time)
+
+ print(f"📊 Records for 6-month range: {len(result)}")
+
+ # Should return empty list for ranges > 90 days
+ assert len(result) == 0, "Should return empty list for date ranges > 90 days"
+ print(f"✅ Correctly handled large date range")
+
+ async def test_edge_case_invalid_coordinates(self, client):
+ """Test edge case: invalid coordinates"""
+ print(f"\n=== Edge Case: Invalid Coordinates ===")
+
+ start_time = datetime(2025, 6, 1)
+ end_time = datetime(2025, 6, 1, 6, 0)
+
+ # Test with invalid coordinates
+ result = await client.get_historical_traffic(999.0, 999.0, start_time, end_time)
+
+ print(f"📊 Records for invalid coords: {len(result)}")
+
+ # Should either return empty list or synthetic data
+ # The function should not crash
+ assert isinstance(result, list), "Should return list even with invalid coords"
+ print(f"✅ Handled invalid coordinates gracefully")
+
+ async def test_real_madrid_zip_access(self, client):
+ """Test if we can access the actual Madrid ZIP files"""
+ print(f"\n=== Real Madrid ZIP Access Test ===")
+
+ # Test the known ZIP URLs you provided
+ test_urls = [
+ "https://datos.madrid.es/egob/catalogo/208627-145-transporte-ptomedida-historico.zip", # June 2025
+ "https://datos.madrid.es/egob/catalogo/208627-144-transporte-ptomedida-historico.zip", # May 2025
+ "https://datos.madrid.es/egob/catalogo/208627-143-transporte-ptomedida-historico.zip", # April 2025
+ ]
+
+ for i, url in enumerate(test_urls):
+ month_name = ["June 2025", "May 2025", "April 2025"][i]
+ print(f"\nTesting {month_name}: {url}")
+
+ try:
+ if hasattr(client, '_fetch_historical_zip'):
+ zip_data = await client._fetch_historical_zip(url)
+ if zip_data:
+ print(f"✅ Successfully fetched ZIP: {len(zip_data)} bytes")
+
+ # Try to inspect ZIP contents
+ try:
+ import zipfile
+ from io import BytesIO
+
+ with zipfile.ZipFile(BytesIO(zip_data), 'r') as zip_file:
+ files = zip_file.namelist()
+ csv_files = [f for f in files if f.endswith('.csv')]
+ print(f"📁 ZIP contains {len(files)} files, {len(csv_files)} CSV files")
+
+ if csv_files:
+ print(f" CSV files: {csv_files[:2]}{'...' if len(csv_files) > 2 else ''}")
+
+ except Exception as e:
+ print(f"⚠️ Could not inspect ZIP contents: {e}")
+ else:
+ print(f"❌ Failed to fetch ZIP")
+ else:
+ print(f"⚠️ ZIP fetch function not available")
+
+ except Exception as e:
+ print(f"❌ Error testing ZIP access: {e}")
+
+
+# Additional standalone test functions for manual running
+async def run_manual_test():
+ """Manual test function that can be run directly"""
+ print("="*60)
+ print("MADRID TRAFFIC TEST - JUNE 2025 DATA")
+ print("="*60)
+
+ client = MadridOpenDataClient()
+ madrid_lat, madrid_lon = 40.4168, -3.7038
+
+ # Test with June 2025 data (last available)
+ start_time = datetime(2025, 6, 15, 14, 0) # June 15, 2025 at 2 PM
+ end_time = datetime(2025, 6, 15, 18, 0) # Until 6 PM (4 hours)
+
+ print(f"\nTesting June 15, 2025 data (2 PM - 6 PM)...")
+ print(f"This should include afternoon traffic patterns")
+
+ result = await client.get_historical_traffic(madrid_lat, madrid_lon, start_time, end_time)
+
+ print(f"Result: {len(result)} records")
+
+ if result:
+ sources = set(r['source'] for r in result)
+ print(f"Data sources: {', '.join(sources)}")
+
+ if 'madrid_opendata_zip' in sources:
+ print(f"🎉 Successfully got real Madrid data!")
+
+ sample = result[0]
+ print(f"\nSample record:")
+ for key, value in sample.items():
+ if key == "date":
+ print(f" {key}: {value.strftime('%Y-%m-%d %H:%M:%S')}")
+ else:
+ print(f" {key}: {value}")
+
+ print(f"\n✅ Manual test completed!")
+
+
+if __name__ == "__main__":
+ # If run directly, execute manual test
+ asyncio.run(run_manual_test())
\ No newline at end of file