1391 lines
60 KiB
Python
1391 lines
60 KiB
Python
# ================================================================
|
|
# services/data/app/external/madrid_opendata.py - REFACTORED
|
|
# ================================================================
|
|
"""
|
|
Madrid Open Data API client with clean architecture and best practices
|
|
|
|
Features:
|
|
- Real-time traffic data from XML endpoints
|
|
- Historical traffic data from ZIP files
|
|
- Measurement points integration
|
|
- Robust error handling and fallbacks
|
|
- Comprehensive logging
|
|
"""
|
|
|
|
import math
|
|
import xml.etree.ElementTree as ET
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
import structlog
|
|
import re
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from app.external.base_client import BaseAPIClient
|
|
from app.core.config import settings
|
|
import pyproj
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# ================================================================
|
|
# CONSTANTS AND ENUMS
|
|
# ================================================================
|
|
|
|
class TrafficServiceLevel(Enum):
|
|
"""Madrid traffic service levels"""
|
|
FLUID = 0
|
|
DENSE = 1
|
|
CONGESTED = 2
|
|
BLOCKED = 3
|
|
|
|
class CongestionLevel(Enum):
|
|
"""Standardized congestion levels"""
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
BLOCKED = "blocked"
|
|
|
|
class DataSource(Enum):
|
|
"""Data source types"""
|
|
MADRID_REALTIME = "madrid_opendata"
|
|
MADRID_HISTORICAL = "madrid_opendata_zip"
|
|
SYNTHETIC = "synthetic"
|
|
SYNTHETIC_HISTORICAL = "synthetic_historical"
|
|
|
|
# Madrid geographic bounds
|
|
MADRID_BOUNDS = {
|
|
'lat_min': 40.31, 'lat_max': 40.56,
|
|
'lon_min': -3.89, 'lon_max': -3.51
|
|
}
|
|
|
|
# Constants
|
|
MAX_HISTORICAL_DAYS = 365
|
|
MAX_CSV_PROCESSING_ROWS = 5000000
|
|
MEASUREMENT_POINTS_LIMIT = 20
|
|
UTM_ZONE = 30 # Madrid is in UTM Zone 30N
|
|
|
|
@dataclass
|
|
class MeasurementPoint:
|
|
"""Measurement point data structure"""
|
|
id: str
|
|
latitude: float
|
|
longitude: float
|
|
distance: float
|
|
name: str
|
|
type: str
|
|
|
|
@dataclass
|
|
class TrafficRecord:
|
|
"""Traffic record data structure"""
|
|
date: datetime
|
|
traffic_volume: int
|
|
occupation_percentage: int
|
|
load_percentage: int
|
|
average_speed: int
|
|
congestion_level: str
|
|
pedestrian_count: int
|
|
measurement_point_id: str
|
|
measurement_point_name: str
|
|
road_type: str
|
|
source: str
|
|
error_status: Optional[str] = None
|
|
# Madrid-specific raw data
|
|
intensidad_raw: Optional[int] = None
|
|
ocupacion_raw: Optional[int] = None
|
|
carga_raw: Optional[int] = None
|
|
vmed_raw: Optional[int] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert TrafficRecord to dictionary"""
|
|
result = {
|
|
"date": self.date,
|
|
"traffic_volume": self.traffic_volume,
|
|
"occupation_percentage": self.occupation_percentage,
|
|
"load_percentage": self.load_percentage,
|
|
"average_speed": self.average_speed,
|
|
"congestion_level": self.congestion_level,
|
|
"pedestrian_count": self.pedestrian_count,
|
|
"measurement_point_id": self.measurement_point_id,
|
|
"measurement_point_name": self.measurement_point_name,
|
|
"road_type": self.road_type,
|
|
"source": self.source
|
|
}
|
|
|
|
# Add optional fields if present
|
|
optional_fields = ['error_status', 'intensidad_raw', 'ocupacion_raw', 'carga_raw', 'vmed_raw']
|
|
for field in optional_fields:
|
|
value = getattr(self, field, None)
|
|
if value is not None:
|
|
result[field] = value
|
|
|
|
return result
|
|
|
|
|
|
# ================================================================
|
|
# MADRID OPEN DATA CLIENT
|
|
# ================================================================
|
|
|
|
class MadridOpenDataClient(BaseAPIClient):
|
|
"""
|
|
Madrid Open Data API client with comprehensive traffic data support
|
|
|
|
Provides both real-time and historical traffic data from Madrid's open data portal.
|
|
Implements robust error handling, coordinate conversion, and synthetic data fallbacks.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__(base_url="https://datos.madrid.es")
|
|
self.traffic_endpoints = [
|
|
"https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml"
|
|
]
|
|
self.measurement_points_url = "https://datos.madrid.es/egob/catalogo/202468-260-intensidad-trafico.csv"
|
|
self._conversion_log_count = [] # Track coordinate conversion logging
|
|
|
|
# Initialize coordinate converter
|
|
self.utm_proj = pyproj.Proj(proj='utm', zone=UTM_ZONE, ellps='WGS84', preserve_units=False)
|
|
|
|
# ================================================================
|
|
# PUBLIC API METHODS
|
|
# ================================================================
|
|
|
|
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get current traffic data for location using working Madrid endpoints
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
|
|
Returns:
|
|
Dict with current traffic data or None if not available
|
|
"""
|
|
try:
|
|
logger.debug("Fetching Madrid traffic data", lat=latitude, lon=longitude)
|
|
|
|
# Try real-time endpoints
|
|
for endpoint in self.traffic_endpoints:
|
|
try:
|
|
traffic_data = await self._fetch_traffic_xml_data(endpoint)
|
|
|
|
if traffic_data:
|
|
logger.info("Successfully fetched Madrid traffic data",
|
|
endpoint=endpoint, points=len(traffic_data))
|
|
|
|
# Find nearest measurement point
|
|
nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data)
|
|
|
|
if nearest_point:
|
|
parsed_data = self._parse_traffic_measurement(nearest_point)
|
|
logger.debug("Successfully parsed real Madrid traffic data",
|
|
point_name=nearest_point.get('descripcion'),
|
|
point_id=nearest_point.get('idelem'))
|
|
return parsed_data
|
|
else:
|
|
closest_distance = self._get_closest_distance(latitude, longitude, traffic_data)
|
|
logger.debug("No nearby traffic points found",
|
|
lat=latitude, lon=longitude,
|
|
closest_distance=closest_distance)
|
|
|
|
except Exception as e:
|
|
logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e))
|
|
continue
|
|
|
|
# Fallback to synthetic data
|
|
logger.info("No nearby Madrid traffic points found, using synthetic data")
|
|
return await self._generate_synthetic_traffic(latitude, longitude)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get current traffic", error=str(e))
|
|
return await self._generate_synthetic_traffic(latitude, longitude)
|
|
|
|
async def get_historical_traffic(self, latitude: float, longitude: float,
|
|
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get historical traffic data from Madrid Open Data ZIP files
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
start_date: Start date for historical data
|
|
end_date: End date for historical data
|
|
|
|
Returns:
|
|
List of historical traffic data dictionaries
|
|
"""
|
|
try:
|
|
logger.debug("Fetching Madrid historical traffic data",
|
|
lat=latitude, lon=longitude, start=start_date, end=end_date)
|
|
|
|
# Validate date range
|
|
if not self._validate_date_range(start_date, end_date):
|
|
return []
|
|
|
|
# Generate synthetic data as fallback
|
|
synthetic_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date)
|
|
logger.info("Generated synthetic historical traffic data", records=len(synthetic_data))
|
|
|
|
# Try to fetch real data
|
|
try:
|
|
real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date)
|
|
if real_data:
|
|
logger.info("Fetched real historical traffic data from ZIP files", records=len(real_data))
|
|
return real_data
|
|
else:
|
|
logger.info("No real historical data available, using synthetic data")
|
|
return synthetic_data
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch real historical data, using synthetic", error=str(e))
|
|
return synthetic_data
|
|
except Exception as e:
|
|
logger.error("Error getting historical traffic data", error=str(e))
|
|
return []
|
|
|
|
async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
|
|
"""Get traffic incidents and events (placeholder for future implementation)"""
|
|
return []
|
|
|
|
# ================================================================
|
|
# REAL-TIME TRAFFIC METHODS
|
|
# ================================================================
|
|
|
|
async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Fetch and parse Madrid traffic XML data"""
|
|
try:
|
|
xml_content = await self._fetch_xml_content_robust(endpoint)
|
|
|
|
if not xml_content:
|
|
logger.debug("No XML content received", endpoint=endpoint)
|
|
return None
|
|
|
|
logger.debug("Madrid XML content preview",
|
|
length=len(xml_content),
|
|
first_500=xml_content[:500] if len(xml_content) > 500 else xml_content)
|
|
|
|
traffic_points = self._parse_madrid_traffic_xml(xml_content)
|
|
|
|
if traffic_points:
|
|
logger.debug("Successfully parsed Madrid traffic XML", points=len(traffic_points))
|
|
return traffic_points
|
|
else:
|
|
logger.warning("No traffic points found in XML", endpoint=endpoint)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching traffic XML data", endpoint=endpoint, error=str(e))
|
|
return None
|
|
|
|
def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]:
|
|
"""Parse Madrid traffic XML with correct structure"""
|
|
traffic_points = []
|
|
|
|
try:
|
|
cleaned_xml = self._clean_madrid_xml(xml_content)
|
|
root = ET.fromstring(cleaned_xml)
|
|
|
|
logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root)))
|
|
|
|
if root.tag == 'pms':
|
|
pm_elements = root.findall('pm')
|
|
logger.debug("Found PM elements", count=len(pm_elements))
|
|
|
|
for pm in pm_elements:
|
|
try:
|
|
traffic_point = self._extract_madrid_pm_element(pm)
|
|
|
|
if self._is_valid_traffic_point(traffic_point):
|
|
traffic_points.append(traffic_point)
|
|
|
|
# Log first few points for debugging
|
|
if len(traffic_points) <= 3:
|
|
logger.debug("Sample traffic point",
|
|
id=traffic_point['idelem'],
|
|
lat=traffic_point['latitude'],
|
|
lon=traffic_point['longitude'],
|
|
intensity=traffic_point.get('intensidad'))
|
|
|
|
except Exception as e:
|
|
logger.debug("Error parsing PM element", error=str(e))
|
|
continue
|
|
else:
|
|
logger.warning("Unexpected XML root tag", root_tag=root.tag)
|
|
|
|
logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points))
|
|
return traffic_points
|
|
|
|
except ET.ParseError as e:
|
|
logger.warning("Failed to parse Madrid XML", error=str(e))
|
|
return self._extract_traffic_data_regex(xml_content)
|
|
except Exception as e:
|
|
logger.error("Error in Madrid traffic XML parsing", error=str(e))
|
|
return []
|
|
|
|
# ================================================================
|
|
# HISTORICAL TRAFFIC METHODS
|
|
# ================================================================
|
|
|
|
async def _fetch_real_historical_traffic(self, latitude: float, longitude: float,
|
|
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""Fetch real historical traffic data from Madrid ZIP files"""
|
|
try:
|
|
historical_data = []
|
|
current_date = start_date.replace(day=1)
|
|
|
|
while current_date <= end_date:
|
|
try:
|
|
month_code = self._calculate_madrid_month_code(current_date.year, current_date.month)
|
|
|
|
if month_code:
|
|
zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip"
|
|
logger.debug("Trying ZIP URL", url=zip_url,
|
|
year=current_date.year, month=current_date.month, code=month_code)
|
|
|
|
zip_data = await self._fetch_historical_zip(zip_url)
|
|
if zip_data:
|
|
month_data = await self._parse_historical_zip(zip_data, latitude, longitude, start_date, end_date)
|
|
historical_data.extend(month_data)
|
|
logger.info("Fetched historical data for month",
|
|
year=current_date.year, month=current_date.month, records=len(month_data))
|
|
else:
|
|
logger.debug("No ZIP data found for month",
|
|
year=current_date.year, month=current_date.month)
|
|
else:
|
|
logger.debug("Could not calculate month code",
|
|
year=current_date.year, month=current_date.month)
|
|
|
|
current_date = self._get_next_month(current_date)
|
|
|
|
except Exception as e:
|
|
logger.warning("Error fetching data for month",
|
|
year=current_date.year, month=current_date.month, error=str(e))
|
|
current_date = self._get_next_month(current_date)
|
|
|
|
return historical_data
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching real historical traffic data", error=str(e))
|
|
return []
|
|
|
|
async def _parse_historical_zip(self, zip_content: bytes, latitude: float, longitude: float,
|
|
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""Parse Madrid historical traffic ZIP file"""
|
|
try:
|
|
import zipfile
|
|
from io import BytesIO
|
|
|
|
historical_records = []
|
|
|
|
with zipfile.ZipFile(BytesIO(zip_content), 'r') as zip_file:
|
|
logger.debug("ZIP file contents", files=zip_file.namelist())
|
|
|
|
csv_files = [f for f in zip_file.namelist()
|
|
if f.endswith('.csv') and not f.startswith('__MACOSX')]
|
|
|
|
if not csv_files:
|
|
logger.warning("No CSV files found in ZIP")
|
|
return []
|
|
|
|
for csv_filename in csv_files:
|
|
logger.debug("Processing CSV file", filename=csv_filename)
|
|
|
|
try:
|
|
csv_content = self._extract_csv_from_zip(zip_file, csv_filename)
|
|
if csv_content:
|
|
file_records = await self._parse_csv_content(
|
|
csv_content, latitude, longitude, start_date, end_date
|
|
)
|
|
historical_records.extend(file_records)
|
|
|
|
logger.debug("Processed CSV file",
|
|
filename=csv_filename, records=len(file_records))
|
|
|
|
except Exception as e:
|
|
logger.warning("Error processing CSV file",
|
|
filename=csv_filename, error=str(e))
|
|
continue
|
|
|
|
return historical_records
|
|
|
|
except Exception as e:
|
|
logger.error("Error parsing historical ZIP", error=str(e))
|
|
return []
|
|
|
|
# ================================================================
|
|
# DATA PARSING AND CONVERSION METHODS
|
|
# ================================================================
|
|
|
|
async def _parse_csv_content(self, csv_content: str, latitude: float, longitude: float,
|
|
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""Parse CSV content from Madrid historical traffic data"""
|
|
try:
|
|
import csv
|
|
from io import StringIO
|
|
|
|
csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
|
|
|
|
if not self._validate_csv_structure(csv_reader.fieldnames):
|
|
return []
|
|
|
|
logger.debug("Madrid CSV structure detected", fields=csv_reader.fieldnames)
|
|
|
|
# Get nearest measurement points
|
|
measurement_points = await self._get_measurement_points_near_location(latitude, longitude)
|
|
target_point_ids = [str(point.id) for point in measurement_points[:10]]
|
|
|
|
logger.debug("Target measurement points", ids=target_point_ids[:3])
|
|
|
|
# Process CSV rows
|
|
historical_records = []
|
|
processed_count = 0
|
|
|
|
for row_num, row in enumerate(csv_reader):
|
|
if processed_count >= MAX_CSV_PROCESSING_ROWS:
|
|
logger.info("Reached processing limit", limit=MAX_CSV_PROCESSING_ROWS)
|
|
break
|
|
|
|
try:
|
|
traffic_record = self._parse_csv_row(row, target_point_ids, start_date, end_date)
|
|
if traffic_record:
|
|
historical_records.append(traffic_record.to_dict())
|
|
processed_count += 1
|
|
|
|
if processed_count % 1000 == 0:
|
|
logger.debug("Processing progress", processed=processed_count)
|
|
|
|
except Exception as e:
|
|
if row_num % 5000 == 0:
|
|
logger.debug("Error parsing CSV row", row_num=row_num, error=str(e))
|
|
continue
|
|
|
|
logger.info("Successfully parsed Madrid CSV",
|
|
total_rows=row_num + 1, processed=processed_count, records=len(historical_records))
|
|
|
|
# Enrich with location data
|
|
if historical_records and measurement_points:
|
|
historical_records = self._enrich_with_location_data(historical_records, measurement_points)
|
|
|
|
return historical_records
|
|
|
|
except Exception as e:
|
|
logger.error("Error parsing Madrid CSV content", error=str(e))
|
|
return []
|
|
|
|
def _parse_csv_row(self, row: Dict[str, str], target_point_ids: List[str],
|
|
start_date: datetime, end_date: datetime) -> Optional[TrafficRecord]:
|
|
"""Parse a single CSV row into a TrafficRecord"""
|
|
try:
|
|
# Extract and validate point ID
|
|
point_id = str(row.get('id', '')).strip()
|
|
if not point_id or (target_point_ids and point_id not in target_point_ids):
|
|
return None
|
|
|
|
# Parse date
|
|
record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"'))
|
|
if not record_date or not (start_date <= record_date <= end_date):
|
|
return None
|
|
|
|
# Parse traffic data
|
|
intensidad = self._safe_int(row.get('intensidad', '0'))
|
|
ocupacion = self._safe_int(row.get('ocupacion', '0'))
|
|
carga = self._safe_int(row.get('carga', '0'))
|
|
vmed = self._safe_int(row.get('vmed', '0'))
|
|
tipo_elem = row.get('tipo_elem', '').strip().strip('"')
|
|
error = row.get('error', 'N').strip().strip('"')
|
|
|
|
# Skip erroneous records
|
|
if error == 'S':
|
|
return None
|
|
|
|
# Calculate derived metrics
|
|
avg_speed = self._calculate_average_speed(vmed, carga, ocupacion)
|
|
congestion_level = self._determine_congestion_level(carga, avg_speed)
|
|
pedestrian_count = self._calculate_pedestrian_count(tipo_elem, record_date.hour)
|
|
|
|
return TrafficRecord(
|
|
date=record_date,
|
|
traffic_volume=intensidad,
|
|
occupation_percentage=ocupacion,
|
|
load_percentage=carga,
|
|
average_speed=avg_speed,
|
|
congestion_level=congestion_level,
|
|
pedestrian_count=pedestrian_count,
|
|
measurement_point_id=point_id,
|
|
measurement_point_name=f"Madrid Point {point_id}",
|
|
road_type=tipo_elem,
|
|
source=DataSource.MADRID_HISTORICAL.value,
|
|
error_status=error,
|
|
intensidad_raw=intensidad,
|
|
ocupacion_raw=ocupacion,
|
|
carga_raw=carga,
|
|
vmed_raw=vmed
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug("Error parsing CSV row", error=str(e))
|
|
return None
|
|
|
|
# ================================================================
|
|
# MEASUREMENT POINTS METHODS
|
|
# ================================================================
|
|
|
|
async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
|
|
"""Get measurement points near the specified location"""
|
|
try:
|
|
points_csv = await self._fetch_measurement_points_csv(self.measurement_points_url)
|
|
|
|
if points_csv:
|
|
return await self._parse_measurement_points_csv(points_csv, latitude, longitude)
|
|
else:
|
|
logger.info("Using fallback measurement points")
|
|
return self._get_fallback_measurement_points(latitude, longitude)
|
|
|
|
except Exception as e:
|
|
logger.warning("Error getting measurement points", error=str(e))
|
|
return self._get_fallback_measurement_points(latitude, longitude)
|
|
|
|
async def _parse_measurement_points_csv(self, csv_content: str, query_lat: float, query_lon: float) -> List[MeasurementPoint]:
|
|
"""Parse measurement points CSV and find nearest points"""
|
|
try:
|
|
import csv
|
|
from io import StringIO
|
|
|
|
points_with_distance = []
|
|
csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';')
|
|
|
|
for row in csv_reader:
|
|
try:
|
|
point_id = row.get('id', '').strip()
|
|
latitud = row.get('latitud', '').strip()
|
|
longitud = row.get('longitud', '').strip()
|
|
nombre = row.get('nombre', '').strip().strip('"')
|
|
tipo_elem = row.get('tipo_elem', '').strip().strip('"')
|
|
|
|
if not (point_id and latitud and longitud):
|
|
continue
|
|
|
|
lat, lon = float(latitud), float(longitud)
|
|
distance = self._calculate_distance(query_lat, query_lon, lat, lon)
|
|
|
|
point = MeasurementPoint(
|
|
id=point_id,
|
|
latitude=lat,
|
|
longitude=lon,
|
|
distance=distance,
|
|
name=nombre or f'Point {point_id}',
|
|
type=tipo_elem
|
|
)
|
|
|
|
points_with_distance.append(point)
|
|
|
|
except Exception as e:
|
|
logger.debug("Error parsing measurement point row", error=str(e))
|
|
continue
|
|
|
|
# Sort by distance and return closest points
|
|
points_with_distance.sort(key=lambda x: x.distance)
|
|
closest_points = points_with_distance[:MEASUREMENT_POINTS_LIMIT]
|
|
|
|
logger.info("Found measurement points",
|
|
total=len(points_with_distance), closest=len(closest_points))
|
|
|
|
return closest_points
|
|
|
|
except Exception as e:
|
|
logger.error("Error parsing measurement points CSV", error=str(e))
|
|
return []
|
|
|
|
# ================================================================
|
|
# COORDINATE CONVERSION METHODS
|
|
# ================================================================
|
|
|
|
def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]:
|
|
"""Extract traffic data from Madrid <pm> element with coordinate conversion"""
|
|
try:
|
|
point_data = {}
|
|
utm_x = utm_y = None
|
|
|
|
# Extract all child elements
|
|
for child in pm_element:
|
|
tag, text = child.tag, child.text.strip() if child.text else ''
|
|
|
|
if tag == 'idelem':
|
|
point_data['idelem'] = text
|
|
elif tag == 'descripcion':
|
|
point_data['descripcion'] = text
|
|
elif tag == 'intensidad':
|
|
point_data['intensidad'] = self._safe_int(text)
|
|
elif tag == 'ocupacion':
|
|
point_data['ocupacion'] = self._safe_float(text)
|
|
elif tag == 'carga':
|
|
point_data['carga'] = self._safe_int(text)
|
|
elif tag == 'nivelServicio':
|
|
point_data['nivelServicio'] = self._safe_int(text)
|
|
elif tag == 'st_x':
|
|
utm_x = text
|
|
point_data['utm_x'] = text
|
|
elif tag == 'st_y':
|
|
utm_y = text
|
|
point_data['utm_y'] = text
|
|
elif tag == 'error':
|
|
point_data['error'] = text
|
|
elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']:
|
|
point_data[tag] = text
|
|
|
|
# Convert coordinates
|
|
if utm_x and utm_y:
|
|
latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y)
|
|
|
|
if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude):
|
|
point_data.update({'latitude': latitude, 'longitude': longitude})
|
|
|
|
# Log successful conversions (limited)
|
|
self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude)
|
|
return point_data
|
|
else:
|
|
logger.debug("Invalid coordinates after conversion",
|
|
idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y)
|
|
return {}
|
|
else:
|
|
logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem'))
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.debug("Error extracting Madrid PM element", error=str(e))
|
|
return {}
|
|
|
|
def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]:
|
|
"""Convert UTM coordinates to lat/lon using pyproj"""
|
|
try:
|
|
utm_x = float(utm_x_str.replace(',', '.'))
|
|
utm_y = float(utm_y_str.replace(',', '.'))
|
|
|
|
longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True)
|
|
return round(latitude, 6), round(longitude, 6)
|
|
except (ValueError, TypeError, Exception):
|
|
return None, None
|
|
|
|
# ================================================================
|
|
# UTILITY AND HELPER METHODS
|
|
# ================================================================
|
|
|
|
def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool:
|
|
"""Validate date range for historical data requests"""
|
|
days_diff = (end_date - start_date).days
|
|
|
|
# Allow same-day ranges (days_diff = 0) and ranges within the same day
|
|
if days_diff < 0:
|
|
logger.warning("End date before start date", start=start_date, end=end_date)
|
|
return False
|
|
|
|
if days_diff > MAX_HISTORICAL_DAYS:
|
|
logger.warning("Date range too large for historical traffic data", days=days_diff)
|
|
return False
|
|
|
|
return True
|
|
|
|
def _calculate_madrid_month_code(self, year: int, month: int) -> Optional[int]:
|
|
"""Calculate Madrid's month code for ZIP files (June 2025 = 145)"""
|
|
try:
|
|
reference_year, reference_month, reference_code = 2025, 6, 145
|
|
months_diff = (year - reference_year) * 12 + (month - reference_month)
|
|
estimated_code = reference_code + months_diff
|
|
|
|
if 100 <= estimated_code <= 300:
|
|
return estimated_code
|
|
else:
|
|
logger.warning("Month code out of range", year=year, month=month, code=estimated_code)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error calculating month code", year=year, month=month, error=str(e))
|
|
return None
|
|
|
|
def _calculate_average_speed(self, vmed: int, carga: int, ocupacion: int) -> int:
|
|
"""Calculate average speed based on available data"""
|
|
if vmed > 0: # M30 points have speed data
|
|
return vmed
|
|
else: # Urban points - estimate from carga and ocupacion
|
|
if carga >= 80:
|
|
speed = 15
|
|
elif carga >= 50:
|
|
speed = 25
|
|
elif carga >= 20:
|
|
speed = 35
|
|
else:
|
|
speed = 45
|
|
|
|
# Adjust based on occupation
|
|
if ocupacion >= 30:
|
|
speed = max(10, speed - 10)
|
|
elif ocupacion <= 5:
|
|
speed = min(50, speed + 5)
|
|
|
|
return speed
|
|
|
|
def _determine_congestion_level(self, carga: int, avg_speed: int) -> str:
|
|
"""Determine congestion level from carga and speed"""
|
|
if carga >= 90 and avg_speed <= 10:
|
|
return CongestionLevel.BLOCKED.value
|
|
elif carga >= 75:
|
|
return CongestionLevel.HIGH.value
|
|
elif carga >= 40:
|
|
return CongestionLevel.MEDIUM.value
|
|
else:
|
|
return CongestionLevel.LOW.value
|
|
|
|
def _calculate_pedestrian_count(self, tipo_elem: str, hour: int) -> int:
|
|
"""Calculate pedestrian estimate based on area type and time"""
|
|
if tipo_elem == 'URB':
|
|
base = 200
|
|
if 12 <= hour <= 14: # Lunch time
|
|
multiplier = 2.0
|
|
elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
|
|
multiplier = 1.5
|
|
else:
|
|
multiplier = 1.0
|
|
else: # M30, C30
|
|
base = 50
|
|
multiplier = 0.5
|
|
|
|
return int(base * multiplier)
|
|
|
|
def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]:
|
|
"""Parse Madrid date format"""
|
|
if not fecha_str:
|
|
return None
|
|
|
|
try:
|
|
return datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S')
|
|
except ValueError:
|
|
return None
|
|
|
|
def _validate_csv_structure(self, fieldnames: Optional[List[str]]) -> bool:
|
|
"""Validate CSV has expected structure"""
|
|
if not fieldnames:
|
|
logger.warning("No CSV fieldnames found")
|
|
return False
|
|
|
|
expected_fields = ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga']
|
|
missing_fields = [field for field in expected_fields if field not in fieldnames]
|
|
|
|
if missing_fields:
|
|
logger.warning("Missing expected fields in CSV", missing=missing_fields, available=fieldnames)
|
|
|
|
return True # Continue processing even with some missing fields
|
|
|
|
def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool:
|
|
"""Check if traffic point has valid essential data"""
|
|
return (traffic_point.get('latitude') and
|
|
traffic_point.get('longitude') and
|
|
traffic_point.get('idelem'))
|
|
|
|
def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool:
|
|
"""Validate coordinates are in Madrid area"""
|
|
return (MADRID_BOUNDS['lat_min'] <= latitude <= MADRID_BOUNDS['lat_max'] and
|
|
MADRID_BOUNDS['lon_min'] <= longitude <= MADRID_BOUNDS['lon_max'])
|
|
|
|
def _get_next_month(self, current_date: datetime) -> datetime:
|
|
"""Get next month date"""
|
|
if current_date.month == 12:
|
|
return current_date.replace(year=current_date.year + 1, month=1)
|
|
else:
|
|
return current_date.replace(month=current_date.month + 1)
|
|
|
|
def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str,
|
|
latitude: float, longitude: float) -> None:
|
|
"""Log coordinate conversion (limited to first few for debugging)"""
|
|
if len(self._conversion_log_count) < 3:
|
|
self._conversion_log_count.append(1)
|
|
logger.debug("Successful UTM conversion",
|
|
idelem=point_data.get('idelem'),
|
|
utm_x=utm_x, utm_y=utm_y,
|
|
latitude=latitude, longitude=longitude,
|
|
descripcion=point_data.get('descripcion'))
|
|
|
|
def _enrich_with_location_data(self, records: List[Dict[str, Any]],
|
|
measurement_points: List[MeasurementPoint]) -> List[Dict[str, Any]]:
|
|
"""Enrich traffic records with location data from measurement points"""
|
|
try:
|
|
points_lookup = {point.id: point for point in measurement_points}
|
|
|
|
for record in records:
|
|
point_id = record.get('measurement_point_id')
|
|
if point_id in points_lookup:
|
|
point = points_lookup[point_id]
|
|
record.update({
|
|
'measurement_point_name': point.name,
|
|
'measurement_point_latitude': point.latitude,
|
|
'measurement_point_longitude': point.longitude,
|
|
'distance_to_query': point.distance
|
|
})
|
|
|
|
return records
|
|
|
|
except Exception as e:
|
|
logger.warning("Error enriching with location data", error=str(e))
|
|
return records
|
|
|
|
# ================================================================
|
|
# HTTP CLIENT METHODS
|
|
# ================================================================
|
|
|
|
async def _fetch_xml_content_robust(self, url: str) -> Optional[str]:
|
|
"""Fetch XML content with robust headers for Madrid endpoints"""
|
|
try:
|
|
import httpx
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'application/xml,text/xml,*/*',
|
|
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Cache-Control': 'no-cache',
|
|
'Referer': 'https://datos.madrid.es/'
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, headers=headers) as client:
|
|
logger.debug("Fetching XML from Madrid endpoint", url=url)
|
|
response = await client.get(url)
|
|
|
|
logger.debug("Madrid API response",
|
|
status=response.status_code,
|
|
content_type=response.headers.get('content-type'),
|
|
content_length=len(response.content))
|
|
|
|
if response.status_code == 200:
|
|
content = self._decode_response_content(response)
|
|
if content and len(content) > 100:
|
|
return content
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch Madrid XML content", url=url, error=str(e))
|
|
return None
|
|
|
|
async def _fetch_historical_zip(self, url: str) -> Optional[bytes]:
|
|
"""Fetch historical ZIP data from Madrid Open Data"""
|
|
try:
|
|
import httpx
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
|
|
'Accept': 'application/zip,application/octet-stream,*/*',
|
|
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=120.0, headers=headers, follow_redirects=True) as client:
|
|
|
|
logger.debug("Fetching historical ZIP", url=url)
|
|
response = await client.get(url)
|
|
|
|
if response.status_code == 200:
|
|
content = response.content
|
|
if content and len(content) > 1000:
|
|
logger.debug("Successfully fetched ZIP", url=url, size=len(content))
|
|
return content
|
|
else:
|
|
logger.debug("ZIP file too small", url=url, size=len(content) if content else 0)
|
|
else:
|
|
logger.debug("ZIP not found", url=url, status=response.status_code)
|
|
|
|
except Exception as e:
|
|
logger.debug("Error fetching ZIP", url=url, error=str(e))
|
|
|
|
return None
|
|
|
|
async def _fetch_measurement_points_csv(self, url: str) -> Optional[str]:
|
|
"""Fetch the measurement points CSV file"""
|
|
try:
|
|
import httpx
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)',
|
|
'Accept': 'text/csv,application/csv,text/plain,*/*',
|
|
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0, headers=headers, follow_redirects=True) as client:
|
|
logger.debug("Fetching measurement points CSV", url=url)
|
|
response = await client.get(url)
|
|
|
|
if response.status_code == 200:
|
|
content = response.text
|
|
if content and len(content) > 1000:
|
|
logger.debug("Successfully fetched measurement points CSV",
|
|
url=url, size=len(content))
|
|
return content
|
|
else:
|
|
logger.debug("Measurement points CSV too small", size=len(content))
|
|
else:
|
|
logger.debug("Measurement points CSV not found",
|
|
url=url, status=response.status_code)
|
|
|
|
except Exception as e:
|
|
logger.debug("Error fetching measurement points CSV", url=url, error=str(e))
|
|
|
|
return None
|
|
|
|
def _decode_response_content(self, response) -> Optional[str]:
|
|
"""Decode response content with multiple encoding attempts"""
|
|
try:
|
|
return response.text
|
|
except UnicodeDecodeError:
|
|
# Try manual encoding for Spanish content
|
|
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
|
|
try:
|
|
content = response.content.decode(encoding)
|
|
if content and len(content) > 100:
|
|
logger.debug("Successfully decoded with encoding", encoding=encoding)
|
|
return content
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return None
|
|
|
|
def _extract_csv_from_zip(self, zip_file, csv_filename: str) -> Optional[str]:
|
|
"""Extract and decode CSV content from ZIP file"""
|
|
try:
|
|
csv_bytes = zip_file.read(csv_filename)
|
|
|
|
# Try different encodings for Spanish content
|
|
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
|
|
try:
|
|
csv_content = csv_bytes.decode(encoding)
|
|
logger.debug("Successfully decoded CSV", filename=csv_filename, encoding=encoding)
|
|
return csv_content
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
logger.warning("Could not decode CSV file", filename=csv_filename)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning("Error extracting CSV from ZIP", filename=csv_filename, error=str(e))
|
|
return None
|
|
|
|
# ================================================================
|
|
# XML PROCESSING METHODS
|
|
# ================================================================
|
|
|
|
def _clean_madrid_xml(self, xml_content: str) -> str:
|
|
"""Clean Madrid XML to handle undefined entities and encoding issues"""
|
|
try:
|
|
# Remove BOM if present
|
|
xml_content = xml_content.lstrip('\ufeff')
|
|
|
|
# Replace undefined entities
|
|
entity_replacements = {
|
|
' ': ' ', '©': '©', '®': '®', '™': '™'
|
|
}
|
|
|
|
for entity, replacement in entity_replacements.items():
|
|
xml_content = xml_content.replace(entity, replacement)
|
|
|
|
# Fix unescaped ampersands
|
|
xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content)
|
|
|
|
# Remove invalid control characters
|
|
xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content)
|
|
|
|
# Handle Spanish characters
|
|
spanish_chars = {
|
|
'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
|
|
'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U'
|
|
}
|
|
|
|
for spanish_char, replacement in spanish_chars.items():
|
|
xml_content = xml_content.replace(spanish_char, replacement)
|
|
|
|
return xml_content
|
|
|
|
except Exception as e:
|
|
logger.warning("Error cleaning Madrid XML", error=str(e))
|
|
return xml_content
|
|
|
|
def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]:
|
|
"""Extract traffic data using regex when XML parsing fails"""
|
|
traffic_points = []
|
|
|
|
try:
|
|
pm_pattern = r'<pm>(.*?)</pm>'
|
|
pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL)
|
|
|
|
for pm_content in pm_matches:
|
|
try:
|
|
extracted_data = self._extract_pm_data_regex(pm_content)
|
|
if extracted_data and self._is_valid_traffic_point(extracted_data):
|
|
traffic_points.append(extracted_data)
|
|
|
|
except Exception as e:
|
|
logger.debug("Error parsing regex PM match", error=str(e))
|
|
continue
|
|
|
|
logger.debug("Regex extraction results", count=len(traffic_points))
|
|
return traffic_points
|
|
|
|
except Exception as e:
|
|
logger.error("Error in regex extraction", error=str(e))
|
|
return []
|
|
|
|
def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]:
|
|
"""Extract individual PM data using regex"""
|
|
patterns = {
|
|
'idelem': r'<idelem>(.*?)</idelem>',
|
|
'intensidad': r'<intensidad>(.*?)</intensidad>',
|
|
'st_x': r'<st_x>(.*?)</st_x>',
|
|
'st_y': r'<st_y>(.*?)</st_y>',
|
|
'descripcion': r'<descripcion>(.*?)</descripcion>'
|
|
}
|
|
|
|
extracted = {}
|
|
for field, pattern in patterns.items():
|
|
match = re.search(pattern, pm_content)
|
|
extracted[field] = match.group(1) if match else ''
|
|
|
|
if extracted['idelem'] and extracted['st_x'] and extracted['st_y']:
|
|
# Convert coordinates
|
|
latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y'])
|
|
|
|
if latitude and longitude:
|
|
return {
|
|
'idelem': extracted['idelem'],
|
|
'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}",
|
|
'intensidad': self._safe_int(extracted['intensidad']),
|
|
'latitude': latitude,
|
|
'longitude': longitude,
|
|
'ocupacion': 0,
|
|
'carga': 0,
|
|
'nivelServicio': 0,
|
|
'error': 'N'
|
|
}
|
|
|
|
return {}
|
|
|
|
# ================================================================
|
|
# TRAFFIC ANALYSIS METHODS
|
|
# ================================================================
|
|
|
|
def _find_nearest_traffic_point(self, latitude: float, longitude: float,
|
|
traffic_data: List[Dict]) -> Optional[Dict]:
|
|
"""Find the nearest traffic measurement point to given coordinates"""
|
|
if not traffic_data:
|
|
return None
|
|
|
|
min_distance = float('inf')
|
|
nearest_point = None
|
|
|
|
for point in traffic_data:
|
|
if point.get('latitude') and point.get('longitude'):
|
|
distance = self._calculate_distance(
|
|
latitude, longitude,
|
|
point['latitude'], point['longitude']
|
|
)
|
|
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
nearest_point = point
|
|
|
|
# Madrid area search radius (15km)
|
|
if nearest_point and min_distance <= 15.0:
|
|
logger.debug("Found nearest Madrid traffic point",
|
|
distance_km=min_distance,
|
|
point_name=nearest_point.get('descripcion'),
|
|
point_id=nearest_point.get('idelem'))
|
|
return nearest_point
|
|
|
|
logger.debug("No nearby Madrid traffic points found",
|
|
min_distance=min_distance, total_points=len(traffic_data))
|
|
return None
|
|
|
|
def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict]) -> float:
|
|
"""Get distance to closest traffic point for debugging"""
|
|
if not traffic_data:
|
|
return float('inf')
|
|
|
|
min_distance = float('inf')
|
|
for point in traffic_data:
|
|
if point.get('latitude') and point.get('longitude'):
|
|
distance = self._calculate_distance(
|
|
latitude, longitude,
|
|
point['latitude'], point['longitude']
|
|
)
|
|
min_distance = min(min_distance, distance)
|
|
|
|
return min_distance
|
|
|
|
def _parse_traffic_measurement(self, traffic_point: Dict) -> Dict[str, Any]:
|
|
"""Parse Madrid traffic measurement into standardized format"""
|
|
try:
|
|
service_level = traffic_point.get('nivelServicio', 0)
|
|
congestion_mapping = {
|
|
TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value,
|
|
TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value,
|
|
TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value,
|
|
TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value
|
|
}
|
|
|
|
# Speed estimation based on service level
|
|
speed_mapping = {
|
|
TrafficServiceLevel.FLUID.value: 45,
|
|
TrafficServiceLevel.DENSE.value: 25,
|
|
TrafficServiceLevel.CONGESTED.value: 15,
|
|
TrafficServiceLevel.BLOCKED.value: 5
|
|
}
|
|
|
|
congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value)
|
|
average_speed = speed_mapping.get(service_level, 25)
|
|
|
|
# Calculate pedestrian estimate
|
|
hour = datetime.now().hour
|
|
pedestrian_multiplier = self._get_pedestrian_multiplier(hour)
|
|
pedestrian_count = int(100 * pedestrian_multiplier)
|
|
|
|
return {
|
|
"date": datetime.now(),
|
|
"traffic_volume": traffic_point.get('intensidad', 0),
|
|
"pedestrian_count": pedestrian_count,
|
|
"congestion_level": congestion_level,
|
|
"average_speed": average_speed,
|
|
"occupation_percentage": traffic_point.get('ocupacion', 0),
|
|
"load_percentage": traffic_point.get('carga', 0),
|
|
"measurement_point_id": traffic_point.get('idelem'),
|
|
"measurement_point_name": traffic_point.get('descripcion'),
|
|
"road_type": "URB",
|
|
"source": DataSource.MADRID_REALTIME.value
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error parsing traffic measurement", error=str(e))
|
|
return self._get_default_traffic_data()
|
|
|
|
def _get_pedestrian_multiplier(self, hour: int) -> float:
|
|
"""Get pedestrian multiplier based on time of day"""
|
|
if 13 <= hour <= 15: # Lunch time
|
|
return 2.5
|
|
elif 8 <= hour <= 9 or 18 <= hour <= 20: # Rush hours
|
|
return 2.0
|
|
else:
|
|
return 1.0
|
|
|
|
# ================================================================
|
|
# SYNTHETIC DATA GENERATION METHODS
|
|
# ================================================================
|
|
|
|
async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]:
|
|
"""Generate realistic Madrid traffic data as fallback"""
|
|
now = datetime.now()
|
|
hour = now.hour
|
|
is_weekend = now.weekday() >= 5
|
|
|
|
traffic_params = self._calculate_traffic_parameters(hour, is_weekend)
|
|
|
|
return {
|
|
"date": now,
|
|
"traffic_volume": traffic_params['volume'],
|
|
"pedestrian_count": traffic_params['pedestrians'],
|
|
"congestion_level": traffic_params['congestion'],
|
|
"average_speed": traffic_params['speed'],
|
|
"occupation_percentage": min(100, traffic_params['volume'] // 2),
|
|
"load_percentage": min(100, traffic_params['volume'] // 3),
|
|
"measurement_point_id": "madrid_synthetic",
|
|
"measurement_point_name": "Madrid Centro (Synthetic)",
|
|
"road_type": "URB",
|
|
"source": DataSource.SYNTHETIC.value
|
|
}
|
|
|
|
async def _generate_historical_traffic(self, latitude: float, longitude: float,
|
|
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""Generate synthetic historical traffic data with realistic patterns"""
|
|
try:
|
|
import random
|
|
from datetime import timedelta
|
|
|
|
historical_data = []
|
|
current_date = start_date
|
|
|
|
# Seed random for consistent data
|
|
random.seed(hash(f"{latitude}{longitude}"))
|
|
|
|
while current_date < end_date:
|
|
# Calculate how many hours to generate for this day
|
|
if current_date.date() == end_date.date():
|
|
# Same day as end_date, only generate up to end_date hour
|
|
end_hour = end_date.hour
|
|
else:
|
|
# Full day
|
|
end_hour = 24
|
|
|
|
# Generate hourly records for this day
|
|
for hour in range(current_date.hour if current_date == start_date else 0, end_hour):
|
|
record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0)
|
|
|
|
# Skip if record time is at or beyond end_date
|
|
if record_time >= end_date:
|
|
break
|
|
|
|
traffic_params = self._generate_synthetic_traffic_params(record_time, random)
|
|
|
|
traffic_record = {
|
|
"date": record_time,
|
|
"traffic_volume": traffic_params['volume'],
|
|
"pedestrian_count": traffic_params['pedestrians'],
|
|
"congestion_level": traffic_params['congestion'],
|
|
"average_speed": traffic_params['speed'],
|
|
"occupation_percentage": min(100, traffic_params['volume'] // 2),
|
|
"load_percentage": min(100, traffic_params['volume'] // 3),
|
|
"measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}",
|
|
"measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})",
|
|
"road_type": "URB",
|
|
"source": DataSource.SYNTHETIC_HISTORICAL.value
|
|
}
|
|
|
|
historical_data.append(traffic_record)
|
|
|
|
# Move to next day
|
|
if current_date.date() == end_date.date():
|
|
# We've processed the end date, stop
|
|
break
|
|
else:
|
|
# Move to start of next day
|
|
current_date = (current_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
logger.info("Generated historical traffic data",
|
|
records=len(historical_data), start=start_date, end=end_date)
|
|
|
|
return historical_data
|
|
|
|
except Exception as e:
|
|
logger.error("Error generating historical traffic data", error=str(e))
|
|
return []
|
|
|
|
def _calculate_traffic_parameters(self, hour: int, is_weekend: bool) -> Dict[str, Any]:
|
|
"""Calculate traffic parameters based on time and day type"""
|
|
base_traffic = 100
|
|
|
|
if not is_weekend:
|
|
if 7 <= hour <= 9:
|
|
multiplier, congestion, speed = 2.2, "high", 15
|
|
elif 18 <= hour <= 20:
|
|
multiplier, congestion, speed = 2.5, "high", 12
|
|
elif 12 <= hour <= 14:
|
|
multiplier, congestion, speed = 1.6, "medium", 25
|
|
else:
|
|
multiplier, congestion, speed = 1.0, "low", 40
|
|
else:
|
|
if 11 <= hour <= 14:
|
|
multiplier, congestion, speed = 1.4, "medium", 30
|
|
else:
|
|
multiplier, congestion, speed = 0.8, "low", 45
|
|
|
|
volume = int(base_traffic * multiplier)
|
|
pedestrians = int(150 * self._get_pedestrian_multiplier(hour))
|
|
|
|
return {
|
|
'volume': volume,
|
|
'congestion': congestion,
|
|
'speed': max(10, speed),
|
|
'pedestrians': pedestrians
|
|
}
|
|
|
|
def _generate_synthetic_traffic_params(self, record_time: datetime, random_gen) -> Dict[str, Any]:
|
|
"""Generate synthetic traffic parameters with random variations"""
|
|
hour = record_time.hour
|
|
day_of_week = record_time.weekday()
|
|
month = record_time.month
|
|
|
|
base_params = self._calculate_traffic_parameters(hour, day_of_week >= 5)
|
|
|
|
# Add random variations
|
|
volume_variation = random_gen.uniform(-0.3, 0.3)
|
|
speed_variation = random_gen.randint(-5, 5)
|
|
|
|
# Apply seasonal adjustments
|
|
seasonal_multiplier = 0.8 if month in [7, 8] else (1.1 if month in [11, 12] else 1.0)
|
|
|
|
# Weekend specific adjustments
|
|
if day_of_week >= 5 and hour in [11, 12, 13, 14, 15]:
|
|
base_params['volume'] = int(base_params['volume'] * 1.4)
|
|
base_params['congestion'] = "medium"
|
|
|
|
return {
|
|
'volume': max(10, int(base_params['volume'] * (1 + volume_variation) * seasonal_multiplier)),
|
|
'congestion': base_params['congestion'],
|
|
'speed': max(10, min(60, base_params['speed'] + speed_variation)),
|
|
'pedestrians': int(base_params['pedestrians'] * random_gen.uniform(0.8, 1.2))
|
|
}
|
|
|
|
def _get_fallback_measurement_points(self, latitude: float, longitude: float) -> List[MeasurementPoint]:
|
|
"""Generate fallback measurement points when CSV is not available"""
|
|
madrid_points = [
|
|
(40.4168, -3.7038, "Madrid Centro"),
|
|
(40.4200, -3.7060, "Gran Vía"),
|
|
(40.4155, -3.7074, "Plaza Mayor"),
|
|
(40.4152, -3.6844, "Retiro"),
|
|
(40.4063, -3.6932, "Atocha"),
|
|
]
|
|
|
|
fallback_points = []
|
|
for i, (lat, lon, name) in enumerate(madrid_points):
|
|
distance = self._calculate_distance(latitude, longitude, lat, lon)
|
|
point = MeasurementPoint(
|
|
id=f'fallback_{i+1000}',
|
|
latitude=lat,
|
|
longitude=lon,
|
|
distance=distance,
|
|
name=name,
|
|
type='URB'
|
|
)
|
|
fallback_points.append(point)
|
|
|
|
fallback_points.sort(key=lambda x: x.distance)
|
|
return fallback_points[:5]
|
|
|
|
def _get_default_traffic_data(self) -> Dict[str, Any]:
|
|
"""Get default traffic data when parsing fails"""
|
|
return {
|
|
"date": datetime.now(),
|
|
"traffic_volume": 100,
|
|
"pedestrian_count": 150,
|
|
"congestion_level": CongestionLevel.MEDIUM.value,
|
|
"average_speed": 25,
|
|
"occupation_percentage": 30,
|
|
"load_percentage": 40,
|
|
"measurement_point_id": "unknown",
|
|
"measurement_point_name": "Unknown location",
|
|
"road_type": "URB",
|
|
"source": DataSource.SYNTHETIC.value
|
|
}
|
|
|
|
# ================================================================
|
|
# CORE UTILITY METHODS
|
|
# ================================================================
|
|
|
|
def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Calculate distance between two coordinates using Haversine formula"""
|
|
R = 6371 # Earth's radius in km
|
|
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
|
|
a = (math.sin(dlat/2) * math.sin(dlat/2) +
|
|
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
|
|
math.sin(dlon/2) * math.sin(dlon/2))
|
|
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
|
return R * c
|
|
|
|
def _safe_int(self, value_str: str) -> int:
|
|
"""Safely convert string to int"""
|
|
try:
|
|
return int(float(value_str.replace(',', '.')))
|
|
except (ValueError, TypeError):
|
|
return 0
|
|
|
|
def _safe_float(self, value_str: str) -> float:
|
|
"""Safely convert string to float"""
|
|
try:
|
|
return float(value_str.replace(',', '.'))
|
|
except (ValueError, TypeError):
|
|
return 0.0 |