REFACTOR data service

This commit is contained in:
Urtzi Alfaro
2025-08-12 18:17:30 +02:00
parent 7c237c0acc
commit fbe7470ad9
149 changed files with 8528 additions and 7393 deletions

View File

704
services/external/app/external/aemet.py vendored Normal file
View File

@@ -0,0 +1,704 @@
# ================================================================
# services/data/app/external/aemet.py - REFACTORED VERSION
# ================================================================
"""AEMET (Spanish Weather Service) API client with improved modularity"""
import math
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import structlog
from app.external.base_client import BaseAPIClient
from app.core.config import settings
logger = structlog.get_logger()
class WeatherSource(Enum):
"""Weather data source types"""
AEMET = "aemet"
SYNTHETIC = "synthetic"
DEFAULT = "default"
@dataclass
class WeatherStation:
"""Weather station data"""
id: str
name: str
latitude: float
longitude: float
@dataclass
class GeographicBounds:
"""Geographic boundary definition"""
min_lat: float
max_lat: float
min_lon: float
max_lon: float
def contains(self, latitude: float, longitude: float) -> bool:
"""Check if coordinates are within bounds"""
return (self.min_lat <= latitude <= self.max_lat and
self.min_lon <= longitude <= self.max_lon)
class AEMETConstants:
"""AEMET API constants and configuration"""
# API Configuration
MAX_DAYS_PER_REQUEST = 30
MADRID_MUNICIPALITY_CODE = "28079"
# Madrid geographic bounds
MADRID_BOUNDS = GeographicBounds(
min_lat=40.3, max_lat=40.6,
min_lon=-3.9, max_lon=-3.5
)
# Weather stations in Madrid area
MADRID_STATIONS = [
WeatherStation("3195", "Madrid Centro", 40.4117, -3.6780),
WeatherStation("3129", "Madrid Norte", 40.4677, -3.5552),
WeatherStation("3197", "Madrid Sur", 40.2987, -3.7216),
]
# Climate simulation parameters
BASE_TEMPERATURE_SEASONAL = 5.0
TEMPERATURE_SEASONAL_MULTIPLIER = 2.5
DAILY_TEMPERATURE_AMPLITUDE = 8.0
EARTH_RADIUS_KM = 6371.0
class WeatherDataParser:
"""Handles parsing of different weather data formats"""
@staticmethod
def safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
"""Safely convert value to float with fallback"""
try:
if value is None:
return default
return float(value)
except (ValueError, TypeError):
return default
@staticmethod
def extract_temperature_value(temp_data: Any) -> Optional[float]:
"""Extract temperature value from AEMET complex temperature structure"""
if temp_data is None:
return None
if isinstance(temp_data, (int, float)):
return float(temp_data)
if isinstance(temp_data, str):
try:
return float(temp_data)
except ValueError:
return None
if isinstance(temp_data, dict) and 'valor' in temp_data:
return WeatherDataParser.safe_float(temp_data['valor'])
if isinstance(temp_data, list) and len(temp_data) > 0:
first_item = temp_data[0]
if isinstance(first_item, dict) and 'valor' in first_item:
return WeatherDataParser.safe_float(first_item['valor'])
return None
@staticmethod
def generate_weather_description(temperature: Optional[float],
precipitation: Optional[float],
humidity: Optional[float]) -> str:
"""Generate weather description based on conditions"""
if precipitation and precipitation > 5.0:
return "Lluvioso"
elif precipitation and precipitation > 0.1:
return "Nuboso con lluvia"
elif humidity and humidity > 80:
return "Nuboso"
elif temperature and temperature > 25:
return "Soleado y cálido"
elif temperature and temperature < 5:
return "Frío"
else:
return "Variable"
def parse_current_weather(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse AEMET current weather data format"""
if not isinstance(data, dict):
logger.warning("Weather data is not a dictionary", data_type=type(data))
return self._get_default_weather_data()
try:
return {
"date": datetime.now(),
"temperature": self.safe_float(data.get("ta"), 15.0),
"precipitation": self.safe_float(data.get("prec"), 0.0),
"humidity": self.safe_float(data.get("hr"), 50.0),
"wind_speed": self.safe_float(data.get("vv"), 10.0),
"pressure": self.safe_float(data.get("pres"), 1013.0),
"description": str(data.get("descripcion", "Partly cloudy")),
"source": WeatherSource.AEMET.value
}
except Exception as e:
logger.error("Error parsing weather data", error=str(e), data=data)
return self._get_default_weather_data()
def parse_historical_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse AEMET historical weather data"""
parsed_data = []
try:
for record in data:
if not isinstance(record, dict):
continue
parsed_record = self._parse_single_historical_record(record)
if parsed_record:
parsed_data.append(parsed_record)
except Exception as e:
logger.error("Error parsing historical weather data", error=str(e))
return parsed_data
def parse_forecast_data(self, data: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Parse AEMET forecast data"""
forecast = []
base_date = datetime.now().date()
if not isinstance(data, list):
logger.warning("Forecast data is not a list", data_type=type(data))
return []
try:
if len(data) > 0 and isinstance(data[0], dict):
aemet_data = data[0]
dias = aemet_data.get('prediccion', {}).get('dia', [])
if isinstance(dias, list) and len(dias) > 0:
forecast = self._parse_forecast_days(dias, days, base_date)
# Fill remaining days with synthetic data if needed
forecast = self._ensure_forecast_completeness(forecast, days)
except Exception as e:
logger.error("Error parsing AEMET forecast data", error=str(e))
forecast = []
return forecast
def _parse_single_historical_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse a single historical weather record"""
fecha_str = record.get('fecha')
if not fecha_str:
return None
try:
record_date = datetime.strptime(fecha_str, '%Y-%m-%d')
except ValueError:
logger.warning("Invalid date format in historical data", fecha=fecha_str)
return None
# Extract and calculate temperature
temp_max = self.safe_float(record.get('tmax'))
temp_min = self.safe_float(record.get('tmin'))
temperature = self._calculate_average_temperature(temp_max, temp_min)
# Extract other weather parameters
precipitation = self.safe_float(record.get('prec'), 0.0)
humidity = self.safe_float(record.get('hr'))
wind_speed = self.safe_float(record.get('velmedia'))
pressure = self._extract_pressure(record)
return {
"date": record_date,
"temperature": temperature,
"precipitation": precipitation,
"humidity": humidity,
"wind_speed": wind_speed,
"pressure": pressure,
"description": self.generate_weather_description(temperature, precipitation, humidity),
"source": WeatherSource.AEMET.value
}
def _calculate_average_temperature(self, temp_max: Optional[float], temp_min: Optional[float]) -> Optional[float]:
"""Calculate average temperature from max and min values"""
if temp_max and temp_min:
return (temp_max + temp_min) / 2
elif temp_max:
return temp_max - 5 # Estimate average from max
elif temp_min:
return temp_min + 5 # Estimate average from min
return None
def _extract_pressure(self, record: Dict[str, Any]) -> Optional[float]:
"""Extract pressure from historical record"""
pressure = self.safe_float(record.get('presMax'))
if not pressure:
pressure = self.safe_float(record.get('presMin'))
return pressure
def _parse_forecast_days(self, dias: List[Dict[str, Any]], days: int, base_date: datetime.date) -> List[Dict[str, Any]]:
"""Parse forecast days from AEMET data"""
forecast = []
for i, dia in enumerate(dias[:days]):
if not isinstance(dia, dict):
continue
forecast_date = base_date + timedelta(days=i)
forecast_day = self._parse_single_forecast_day(dia, forecast_date, i)
forecast.append(forecast_day)
return forecast
def _parse_single_forecast_day(self, dia: Dict[str, Any], forecast_date: datetime.date, day_index: int) -> Dict[str, Any]:
"""Parse a single forecast day"""
# Extract temperature
temp_data = dia.get('temperatura', {})
avg_temp = self._extract_forecast_temperature(temp_data)
# Extract precipitation probability
precip_prob = self._extract_precipitation_probability(dia.get('probPrecipitacion', []))
# Extract wind speed
wind_speed = self._extract_wind_speed(dia.get('viento', []))
# Generate description
description = self._generate_forecast_description(precip_prob)
return {
"forecast_date": datetime.combine(forecast_date, datetime.min.time()),
"generated_at": datetime.now(),
"temperature": round(avg_temp, 1),
"precipitation": precip_prob / 10, # Convert percentage to mm estimate
"humidity": 50.0 + (day_index % 20), # Estimate
"wind_speed": round(wind_speed, 1),
"description": description,
"source": WeatherSource.AEMET.value
}
def _extract_forecast_temperature(self, temp_data: Dict[str, Any]) -> float:
"""Extract temperature from forecast temperature data"""
if isinstance(temp_data, dict):
temp_max = self.extract_temperature_value(temp_data.get('maxima'))
temp_min = self.extract_temperature_value(temp_data.get('minima'))
if temp_max and temp_min:
return (temp_max + temp_min) / 2
return 15.0
def _extract_precipitation_probability(self, precip_data: List[Dict[str, Any]]) -> float:
"""Extract precipitation probability from forecast data"""
precip_prob = 0.0
if isinstance(precip_data, list):
for precip_item in precip_data:
if isinstance(precip_item, dict) and 'value' in precip_item:
precip_prob = max(precip_prob, self.safe_float(precip_item.get('value'), 0.0))
return precip_prob
def _extract_wind_speed(self, viento_data: List[Dict[str, Any]]) -> float:
"""Extract wind speed from forecast data"""
wind_speed = 10.0
if isinstance(viento_data, list):
for viento_item in viento_data:
if isinstance(viento_item, dict) and 'velocidad' in viento_item:
speed_values = viento_item.get('velocidad', [])
if isinstance(speed_values, list) and len(speed_values) > 0:
wind_speed = self.safe_float(speed_values[0], 10.0)
break
return wind_speed
def _generate_forecast_description(self, precip_prob: float) -> str:
"""Generate description based on precipitation probability"""
if precip_prob > 70:
return "Lluvioso"
elif precip_prob > 30:
return "Parcialmente nublado"
else:
return "Soleado"
def _ensure_forecast_completeness(self, forecast: List[Dict[str, Any]], days: int) -> List[Dict[str, Any]]:
"""Ensure forecast has the requested number of days"""
if len(forecast) < days:
remaining_days = days - len(forecast)
synthetic_generator = SyntheticWeatherGenerator()
synthetic_forecast = synthetic_generator.generate_forecast_sync(remaining_days, len(forecast))
forecast.extend(synthetic_forecast)
return forecast[:days]
def _get_default_weather_data(self) -> Dict[str, Any]:
"""Get default weather data structure"""
return {
"date": datetime.now(),
"temperature": 15.0,
"precipitation": 0.0,
"humidity": 50.0,
"wind_speed": 10.0,
"pressure": 1013.0,
"description": "Data not available",
"source": WeatherSource.DEFAULT.value
}
class SyntheticWeatherGenerator:
"""Generates realistic synthetic weather data for Madrid"""
def generate_current_weather(self) -> Dict[str, Any]:
"""Generate realistic synthetic current weather for Madrid"""
now = datetime.now()
month = now.month
hour = now.hour
# Madrid climate simulation
temperature = self._calculate_current_temperature(month, hour)
precipitation = self._calculate_current_precipitation(now, month)
return {
"date": now,
"temperature": round(temperature, 1),
"precipitation": precipitation,
"humidity": 45 + (month % 6) * 5,
"wind_speed": 8 + (hour % 12),
"pressure": 1013 + math.sin(now.day * 0.2) * 15,
"description": "Lluvioso" if precipitation > 0 else "Soleado",
"source": WeatherSource.SYNTHETIC.value
}
def generate_forecast_sync(self, days: int, start_offset: int = 0) -> List[Dict[str, Any]]:
"""Generate synthetic forecast data synchronously"""
forecast = []
base_date = datetime.now().date()
for i in range(days):
forecast_date = base_date + timedelta(days=start_offset + i)
forecast_day = self._generate_forecast_day(forecast_date, start_offset + i)
forecast.append(forecast_day)
return forecast
async def generate_forecast(self, days: int) -> List[Dict[str, Any]]:
"""Generate synthetic forecast data (async version for compatibility)"""
return self.generate_forecast_sync(days, 0)
def generate_historical_data(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Generate synthetic historical weather data"""
historical_data = []
current_date = start_date
while current_date <= end_date:
historical_day = self._generate_historical_day(current_date)
historical_data.append(historical_day)
current_date += timedelta(days=1)
return historical_data
def _calculate_current_temperature(self, month: int, hour: int) -> float:
"""Calculate current temperature based on seasonal and daily patterns"""
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = math.sin((hour - 6) * math.pi / 12) * AEMETConstants.DAILY_TEMPERATURE_AMPLITUDE
return base_temp + temp_variation
def _calculate_current_precipitation(self, now: datetime, month: int) -> float:
"""Calculate current precipitation based on seasonal patterns"""
rain_prob = 0.3 if month in [11, 12, 1, 2, 3] else 0.1
return 2.5 if hash(now.date()) % 100 < rain_prob * 100 else 0.0
def _generate_forecast_day(self, forecast_date: datetime.date, day_offset: int) -> Dict[str, Any]:
"""Generate a single forecast day"""
month = forecast_date.month
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = ((day_offset) % 7 - 3) * 2 # Weekly variation
return {
"forecast_date": datetime.combine(forecast_date, datetime.min.time()),
"generated_at": datetime.now(),
"temperature": round(base_temp + temp_variation, 1),
"precipitation": 2.0 if day_offset % 5 == 0 else 0.0,
"humidity": 50 + (day_offset % 30),
"wind_speed": 10 + (day_offset % 15),
"description": "Lluvioso" if day_offset % 5 == 0 else "Soleado",
"source": WeatherSource.SYNTHETIC.value
}
def _generate_historical_day(self, date: datetime) -> Dict[str, Any]:
"""Generate a single historical day"""
month = date.month
base_temp = AEMETConstants.BASE_TEMPERATURE_SEASONAL + (month - 1) * AEMETConstants.TEMPERATURE_SEASONAL_MULTIPLIER
temp_variation = math.sin(date.day * 0.3) * 5
return {
"date": date,
"temperature": round(base_temp + temp_variation, 1),
"precipitation": 1.5 if date.day % 7 == 0 else 0.0,
"humidity": 45 + (date.day % 40),
"wind_speed": 8 + (date.day % 20),
"pressure": 1013 + math.sin(date.day * 0.2) * 20,
"description": "Variable",
"source": WeatherSource.SYNTHETIC.value
}
class LocationService:
"""Handles location-related operations"""
@staticmethod
def find_nearest_station(latitude: float, longitude: float) -> Optional[str]:
"""Find nearest weather station to given coordinates"""
try:
# Check if coordinates are reasonable (not extreme values)
if not (-90 <= latitude <= 90 and -180 <= longitude <= 180):
logger.warning("Invalid coordinate range", lat=latitude, lon=longitude)
return None
# Check if coordinates are too far from Madrid area (more than 1000km away)
madrid_center = (40.4168, -3.7038)
distance_to_madrid = LocationService.calculate_distance(
latitude, longitude, madrid_center[0], madrid_center[1]
)
if distance_to_madrid > 1000: # More than 1000km from Madrid
logger.warning("Coordinates too far from Madrid",
lat=latitude, lon=longitude, distance_km=distance_to_madrid)
return None
closest_station = None
min_distance = float('inf')
for station in AEMETConstants.MADRID_STATIONS:
distance = LocationService.calculate_distance(
latitude, longitude, station.latitude, station.longitude
)
if distance < min_distance:
min_distance = distance
closest_station = station.id
return closest_station
except Exception as e:
logger.error("Failed to find nearest station", error=str(e))
return None
@staticmethod
def get_municipality_code(latitude: float, longitude: float) -> Optional[str]:
"""Get municipality code for coordinates"""
if AEMETConstants.MADRID_BOUNDS.contains(latitude, longitude):
return AEMETConstants.MADRID_MUNICIPALITY_CODE
return None
@staticmethod
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates using Haversine formula"""
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return AEMETConstants.EARTH_RADIUS_KM * c
class AEMETClient(BaseAPIClient):
"""AEMET (Spanish Weather Service) API client with improved modularity"""
def __init__(self):
super().__init__(
base_url="https://opendata.aemet.es/opendata/api",
api_key=settings.AEMET_API_KEY
)
self.parser = WeatherDataParser()
self.synthetic_generator = SyntheticWeatherGenerator()
self.location_service = LocationService()
async def get_current_weather(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current weather for coordinates"""
try:
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
logger.warning("No weather station found", lat=latitude, lon=longitude)
return await self._get_synthetic_current_weather()
weather_data = await self._fetch_current_weather_data(station_id)
if weather_data:
return self.parser.parse_current_weather(weather_data)
logger.info("Falling back to synthetic weather data", reason="invalid_weather_data")
return await self._get_synthetic_current_weather()
except Exception as e:
logger.error("Failed to get current weather", error=str(e))
return await self._get_synthetic_current_weather()
async def get_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[Dict[str, Any]]:
"""Get weather forecast for coordinates"""
try:
municipality_code = self.location_service.get_municipality_code(latitude, longitude)
if not municipality_code:
logger.info("No municipality code found, using synthetic data")
return await self.synthetic_generator.generate_forecast(days)
forecast_data = await self._fetch_forecast_data(municipality_code)
if forecast_data:
parsed_forecast = self.parser.parse_forecast_data(forecast_data, days)
if parsed_forecast:
return parsed_forecast
logger.info("Falling back to synthetic forecast data", reason="invalid_forecast_data")
return await self.synthetic_generator.generate_forecast(days)
except Exception as e:
logger.error("Failed to get weather forecast", error=str(e))
return await self.synthetic_generator.generate_forecast(days)
async def get_historical_weather(self,
latitude: float,
longitude: float,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical weather data"""
try:
logger.debug("Getting historical weather from AEMET API",
lat=latitude, lon=longitude,
start=start_date, end=end_date)
station_id = self.location_service.find_nearest_station(latitude, longitude)
if not station_id:
logger.warning("No weather station found for historical data",
lat=latitude, lon=longitude)
return self.synthetic_generator.generate_historical_data(start_date, end_date)
historical_data = await self._fetch_historical_data_in_chunks(
station_id, start_date, end_date
)
if historical_data:
logger.debug("Successfully fetched historical weather data",
total_count=len(historical_data))
return historical_data
else:
logger.info("No real historical data available, using synthetic data")
return self.synthetic_generator.generate_historical_data(start_date, end_date)
except Exception as e:
logger.error("Failed to get historical weather from AEMET API", error=str(e))
return self.synthetic_generator.generate_historical_data(start_date, end_date)
async def _fetch_current_weather_data(self, station_id: str) -> Optional[Dict[str, Any]]:
"""Fetch current weather data from AEMET API"""
endpoint = f"/observacion/convencional/datos/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
return None
datos_url = initial_response.get("datos")
actual_weather_data = await self._fetch_from_url(datos_url)
if (actual_weather_data and isinstance(actual_weather_data, list)
and len(actual_weather_data) > 0):
return actual_weather_data[0]
return None
async def _fetch_forecast_data(self, municipality_code: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch forecast data from AEMET API"""
endpoint = f"/prediccion/especifica/municipio/diaria/{municipality_code}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
return None
datos_url = initial_response.get("datos")
return await self._fetch_from_url(datos_url)
async def _fetch_historical_data_in_chunks(self,
station_id: str,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch historical data in chunks due to AEMET API limitations"""
historical_data = []
current_date = start_date
while current_date <= end_date:
chunk_end_date = min(
current_date + timedelta(days=AEMETConstants.MAX_DAYS_PER_REQUEST),
end_date
)
chunk_data = await self._fetch_historical_chunk(
station_id, current_date, chunk_end_date
)
if chunk_data:
historical_data.extend(chunk_data)
current_date = chunk_end_date + timedelta(days=1)
return historical_data
async def _fetch_historical_chunk(self,
station_id: str,
start_date: datetime,
end_date: datetime) -> List[Dict[str, Any]]:
"""Fetch a single chunk of historical data"""
start_str = start_date.strftime("%Y-%m-%dT00:00:00UTC")
end_str = end_date.strftime("%Y-%m-%dT23:59:59UTC")
endpoint = f"/valores/climatologicos/diarios/datos/fechaini/{start_str}/fechafin/{end_str}/estacion/{station_id}"
initial_response = await self._get(endpoint)
if not self._is_valid_initial_response(initial_response):
logger.warning("Invalid initial response from AEMET historical API",
start=start_str, end=end_str)
return []
datos_url = initial_response.get("datos")
if not datos_url:
logger.warning("No datos URL in AEMET historical response",
start=start_str, end=end_str)
return []
actual_historical_data = await self._fetch_from_url(datos_url)
if actual_historical_data and isinstance(actual_historical_data, list):
chunk_data = self.parser.parse_historical_data(actual_historical_data)
logger.debug("Fetched historical data chunk",
count=len(chunk_data), start=start_str, end=end_str)
return chunk_data
else:
logger.warning("No valid historical data received for chunk",
start=start_str, end=end_str)
return []
async def _fetch_from_url(self, url: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch data from AEMET datos URL"""
try:
data = await self._fetch_url_directly(url)
if data and isinstance(data, list):
return data
else:
logger.warning("Expected list from datos URL", data_type=type(data))
return None
except Exception as e:
logger.error("Failed to fetch from datos URL", url=url, error=str(e))
return None
def _is_valid_initial_response(self, response: Any) -> bool:
"""Check if initial AEMET API response is valid"""
return (response and isinstance(response, dict) and
response.get("datos") and isinstance(response.get("datos"), str))
async def _get_synthetic_current_weather(self) -> Dict[str, Any]:
"""Get synthetic current weather data"""
return self.synthetic_generator.generate_current_weather()

View File

@@ -0,0 +1,10 @@
# ================================================================
# services/data/app/external/apis/__init__.py
# ================================================================
"""
External API clients module - Scalable architecture for multiple cities
"""
from .traffic import TrafficAPIClientFactory
__all__ = ["TrafficAPIClientFactory"]

View File

@@ -0,0 +1,350 @@
# ================================================================
# services/data/app/external/apis/madrid_traffic_client.py
# ================================================================
"""
Madrid traffic client - Orchestration layer only
Coordinates between HTTP client, data processor, and business logic components
"""
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Any, Optional, Tuple
import structlog
from .traffic import BaseTrafficClient, SupportedCity
from ..base_client import BaseAPIClient
from ..clients.madrid_client import MadridTrafficAPIClient
from ..processors.madrid_processor import MadridTrafficDataProcessor
from ..processors.madrid_business_logic import MadridTrafficAnalyzer
from ..models.madrid_models import TrafficRecord, CongestionLevel
class MadridTrafficClient(BaseTrafficClient, BaseAPIClient):
"""
Enhanced Madrid traffic client - Orchestration layer
Coordinates HTTP, processing, and business logic components
"""
# Madrid geographic bounds
MADRID_BOUNDS = {
'lat_min': 40.31, 'lat_max': 40.56,
'lon_min': -3.89, 'lon_max': -3.51
}
# Configuration constants
MAX_HISTORICAL_DAYS = 1095 # 3 years
MAX_CSV_PROCESSING_ROWS = 5000000
MEASUREMENT_POINTS_LIMIT = 20
def __init__(self):
BaseTrafficClient.__init__(self, SupportedCity.MADRID)
BaseAPIClient.__init__(self, base_url="https://datos.madrid.es")
# Initialize components
self.api_client = MadridTrafficAPIClient()
self.processor = MadridTrafficDataProcessor()
self.analyzer = MadridTrafficAnalyzer()
self.logger = structlog.get_logger()
def supports_location(self, latitude: float, longitude: float) -> bool:
"""Check if location is within Madrid bounds"""
return (self.MADRID_BOUNDS['lat_min'] <= latitude <= self.MADRID_BOUNDS['lat_max'] and
self.MADRID_BOUNDS['lon_min'] <= longitude <= self.MADRID_BOUNDS['lon_max'])
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current traffic data with enhanced pedestrian inference"""
try:
if not self.supports_location(latitude, longitude):
self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude)
return None
# Fetch XML data
xml_content = await self.api_client.fetch_current_traffic_xml()
if not xml_content:
self.logger.warning("No XML content received")
return None
# Parse XML data
traffic_points = self.processor.parse_traffic_xml(xml_content)
if not traffic_points:
self.logger.warning("No traffic points found in XML")
return None
# Find nearest traffic point
nearest_point = self.analyzer.find_nearest_traffic_point(traffic_points, latitude, longitude)
if not nearest_point:
self.logger.warning("No nearby traffic points found")
return None
# Enhance with business logic
enhanced_data = await self._enhance_traffic_data(nearest_point, latitude, longitude)
self.logger.info("Current traffic data retrieved",
point_id=nearest_point.get('measurement_point_id'),
distance=enhanced_data.get('distance_km', 0))
return enhanced_data
except Exception as e:
self.logger.error("Error getting current traffic", error=str(e))
return None
async def get_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical traffic data with pedestrian enhancement"""
try:
if not self.supports_location(latitude, longitude):
self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude)
return []
# Validate date range
if (end_date - start_date).days > self.MAX_HISTORICAL_DAYS:
self.logger.warning("Date range too large, truncating",
requested_days=(end_date - start_date).days,
max_days=self.MAX_HISTORICAL_DAYS)
start_date = end_date - timedelta(days=self.MAX_HISTORICAL_DAYS)
# Fetch measurement points registry
csv_content = await self.api_client.fetch_measurement_points_csv()
if not csv_content:
self.logger.error("Failed to fetch measurement points registry")
return []
# Parse measurement points
measurement_points = self.processor.parse_measurement_points_csv(csv_content)
if not measurement_points:
self.logger.error("No measurement points found")
return []
# Find nearest measurement points
nearest_points = self.analyzer.find_nearest_measurement_points(
measurement_points, latitude, longitude, num_points=3
)
if not nearest_points:
self.logger.warning("No nearby measurement points found")
return []
# Process historical data
historical_records = await self._fetch_historical_data_enhanced(
latitude, longitude, start_date, end_date, nearest_points
)
self.logger.info("Historical traffic data retrieved",
records_count=len(historical_records),
date_range=f"{start_date.date()} to {end_date.date()}")
return historical_records
except Exception as e:
self.logger.error("Error getting historical traffic", error=str(e))
return []
async def get_events(self, latitude: float, longitude: float,
radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic events (incidents, construction, etc.)"""
# Madrid doesn't provide separate events endpoint
# Return enhanced current traffic data as events
current_data = await self.get_current_traffic(latitude, longitude)
if current_data and current_data.get('congestion_level') in ['high', 'blocked']:
return [{
'type': 'congestion',
'severity': current_data.get('congestion_level'),
'description': f"High traffic congestion at {current_data.get('measurement_point_name', 'measurement point')}",
'location': {
'latitude': current_data.get('latitude'),
'longitude': current_data.get('longitude')
},
'timestamp': current_data.get('timestamp')
}]
return []
async def _enhance_traffic_data(self, traffic_point: Dict[str, Any],
query_lat: float, query_lon: float) -> Dict[str, Any]:
"""Enhance traffic data with business logic and pedestrian inference"""
# Calculate distance
distance_km = self.analyzer.calculate_distance(
query_lat, query_lon,
traffic_point.get('latitude', 0),
traffic_point.get('longitude', 0)
)
# Classify road type
road_type = self.analyzer.classify_road_type(
traffic_point.get('measurement_point_name', '')
)
# Get congestion level
congestion_level = self.analyzer.get_congestion_level(
traffic_point.get('ocupacion', 0)
)
# Create traffic record for pedestrian inference
traffic_record = TrafficRecord(
date=datetime.now(timezone.utc),
traffic_volume=traffic_point.get('intensidad', 0),
occupation_percentage=int(traffic_point.get('ocupacion', 0)),
load_percentage=traffic_point.get('carga', 0),
average_speed=30, # Default speed
congestion_level=congestion_level,
pedestrian_count=0, # Will be calculated
measurement_point_id=traffic_point.get('measurement_point_id', ''),
measurement_point_name=traffic_point.get('measurement_point_name', ''),
road_type=road_type,
source='madrid_current_xml'
)
# Calculate pedestrian count
location_context = {
'latitude': traffic_point.get('latitude'),
'longitude': traffic_point.get('longitude'),
'measurement_point_name': traffic_point.get('measurement_point_name')
}
pedestrian_count, inference_metadata = self.analyzer.calculate_pedestrian_flow(
traffic_record, location_context
)
# Build enhanced response
enhanced_data = {
'timestamp': datetime.now(timezone.utc),
'latitude': traffic_point.get('latitude'),
'longitude': traffic_point.get('longitude'),
'measurement_point_id': traffic_point.get('measurement_point_id'),
'measurement_point_name': traffic_point.get('measurement_point_name'),
'traffic_volume': traffic_point.get('intensidad', 0),
'occupation_percentage': int(traffic_point.get('ocupacion', 0)),
'load_percentage': traffic_point.get('carga', 0),
'congestion_level': congestion_level,
'pedestrian_count': pedestrian_count,
'road_type': road_type,
'distance_km': distance_km,
'source': 'madrid_current_xml',
'city': 'madrid',
'inference_metadata': inference_metadata,
'raw_data': traffic_point
}
return enhanced_data
async def _fetch_historical_data_enhanced(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime,
nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]:
"""Fetch and process historical traffic data"""
historical_records = []
try:
# Process by year and month to avoid memory issues
current_date = start_date.replace(day=1) # Start from beginning of month
while current_date <= end_date:
year = current_date.year
month = current_date.month
# Build historical URL
zip_url = self.api_client._build_historical_url(year, month)
self.logger.info("Processing historical ZIP file",
year=year, month=month, zip_url=zip_url)
# Fetch ZIP content
zip_content = await self.api_client.fetch_historical_zip(zip_url)
if not zip_content:
self.logger.warning("Failed to fetch historical ZIP", url=zip_url)
current_date = current_date.replace(month=current_date.month + 1) if current_date.month < 12 else current_date.replace(year=current_date.year + 1, month=1)
continue
# Process ZIP content with enhanced parsing
month_records = await self._process_historical_zip_enhanced(
zip_content, zip_url, latitude, longitude, nearest_points
)
# Filter by date range - ensure timezone consistency
# Make sure start_date and end_date have timezone info for comparison
start_tz = start_date if start_date.tzinfo else start_date.replace(tzinfo=timezone.utc)
end_tz = end_date if end_date.tzinfo else end_date.replace(tzinfo=timezone.utc)
filtered_records = []
for record in month_records:
record_date = record.get('date')
if not record_date:
continue
# Ensure record date has timezone info
if not record_date.tzinfo:
record_date = record_date.replace(tzinfo=timezone.utc)
# Now compare with consistent timezone info
if start_tz <= record_date <= end_tz:
filtered_records.append(record)
historical_records.extend(filtered_records)
self.logger.info("Month processing completed",
year=year, month=month,
month_records=len(month_records),
filtered_records=len(filtered_records),
total_records=len(historical_records))
# Move to next month
if current_date.month == 12:
current_date = current_date.replace(year=current_date.year + 1, month=1)
else:
current_date = current_date.replace(month=current_date.month + 1)
return historical_records
except Exception as e:
self.logger.error("Error fetching historical data", error=str(e))
return historical_records # Return partial results
async def _process_historical_zip_enhanced(self, zip_content: bytes, zip_url: str,
latitude: float, longitude: float,
nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]:
"""Process historical ZIP file with enhanced parsing"""
try:
import zipfile
import io
import csv
import gc
historical_records = []
nearest_ids = {p[0] for p in nearest_points}
with zipfile.ZipFile(io.BytesIO(zip_content)) as zip_file:
csv_files = [f for f in zip_file.namelist() if f.lower().endswith('.csv')]
for csv_filename in csv_files:
try:
# Read CSV content
with zip_file.open(csv_filename) as csv_file:
text_content = csv_file.read().decode('utf-8', errors='ignore')
# Process CSV in chunks using processor
csv_records = await self.processor.process_csv_content_chunked(
text_content, csv_filename, nearest_ids, nearest_points
)
historical_records.extend(csv_records)
# Force garbage collection
gc.collect()
except Exception as csv_error:
self.logger.warning("Error processing CSV file",
filename=csv_filename,
error=str(csv_error))
continue
self.logger.info("Historical ZIP processing completed",
zip_url=zip_url,
total_records=len(historical_records))
return historical_records
except Exception as e:
self.logger.error("Error processing historical ZIP file",
zip_url=zip_url, error=str(e))
return []

View File

@@ -0,0 +1,257 @@
# ================================================================
# services/data/app/external/apis/traffic.py
# ================================================================
"""
Traffic API abstraction layer for multiple cities
"""
import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from typing import Dict, List, Any, Optional, Tuple
import structlog
logger = structlog.get_logger()
class SupportedCity(Enum):
"""Supported cities for traffic data collection"""
MADRID = "madrid"
BARCELONA = "barcelona"
VALENCIA = "valencia"
class BaseTrafficClient(ABC):
"""
Abstract base class for city-specific traffic clients
Defines the contract that all traffic clients must implement
"""
def __init__(self, city: SupportedCity):
self.city = city
self.logger = structlog.get_logger().bind(city=city.value)
@abstractmethod
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current traffic data for location"""
pass
@abstractmethod
async def get_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical traffic data"""
pass
@abstractmethod
async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic incidents and events"""
pass
@abstractmethod
def supports_location(self, latitude: float, longitude: float) -> bool:
"""Check if this client supports the given location"""
pass
class TrafficAPIClientFactory:
"""
Factory class to create appropriate traffic clients based on location
"""
# City geographical bounds
CITY_BOUNDS = {
SupportedCity.MADRID: {
'lat_min': 40.31, 'lat_max': 40.56,
'lon_min': -3.89, 'lon_max': -3.51
},
SupportedCity.BARCELONA: {
'lat_min': 41.32, 'lat_max': 41.47,
'lon_min': 2.05, 'lon_max': 2.25
},
SupportedCity.VALENCIA: {
'lat_min': 39.42, 'lat_max': 39.52,
'lon_min': -0.42, 'lon_max': -0.32
}
}
@classmethod
def get_client_for_location(cls, latitude: float, longitude: float) -> Optional[BaseTrafficClient]:
"""
Get appropriate traffic client for given location
Args:
latitude: Query location latitude
longitude: Query location longitude
Returns:
BaseTrafficClient instance or None if location not supported
"""
try:
# Check each city's bounds
for city, bounds in cls.CITY_BOUNDS.items():
if (bounds['lat_min'] <= latitude <= bounds['lat_max'] and
bounds['lon_min'] <= longitude <= bounds['lon_max']):
logger.info("Location matched to city",
city=city.value, lat=latitude, lon=longitude)
return cls._create_client(city)
# If no specific city matches, try to find closest supported city
closest_city = cls._find_closest_city(latitude, longitude)
if closest_city:
logger.info("Using closest city for location",
closest_city=closest_city.value, lat=latitude, lon=longitude)
return cls._create_client(closest_city)
logger.warning("No traffic client available for location",
lat=latitude, lon=longitude)
return None
except Exception as e:
logger.error("Error getting traffic client for location",
lat=latitude, lon=longitude, error=str(e))
return None
@classmethod
def _create_client(cls, city: SupportedCity) -> BaseTrafficClient:
"""Create traffic client for specific city"""
if city == SupportedCity.MADRID:
from .madrid_traffic_client import MadridTrafficClient
return MadridTrafficClient()
elif city == SupportedCity.BARCELONA:
# Future implementation
raise NotImplementedError(f"Traffic client for {city.value} not yet implemented")
elif city == SupportedCity.VALENCIA:
# Future implementation
raise NotImplementedError(f"Traffic client for {city.value} not yet implemented")
else:
raise ValueError(f"Unsupported city: {city}")
@classmethod
def _find_closest_city(cls, latitude: float, longitude: float) -> Optional[SupportedCity]:
"""Find closest supported city to given coordinates"""
import math
def distance(lat1, lon1, lat2, lon2):
"""Calculate distance between two coordinates"""
R = 6371 # Earth's radius in km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
min_distance = float('inf')
closest_city = None
# City centers for distance calculation
city_centers = {
SupportedCity.MADRID: (40.4168, -3.7038),
SupportedCity.BARCELONA: (41.3851, 2.1734),
SupportedCity.VALENCIA: (39.4699, -0.3763)
}
for city, (city_lat, city_lon) in city_centers.items():
dist = distance(latitude, longitude, city_lat, city_lon)
if dist < min_distance and dist < 100: # Within 100km
min_distance = dist
closest_city = city
return closest_city
@classmethod
def get_supported_cities(cls) -> List[Dict[str, Any]]:
"""Get list of supported cities with their bounds"""
cities = []
for city, bounds in cls.CITY_BOUNDS.items():
cities.append({
"city": city.value,
"bounds": bounds,
"status": "active" if city == SupportedCity.MADRID else "planned"
})
return cities
class UniversalTrafficClient:
"""
Universal traffic client that delegates to appropriate city-specific clients
This is the main interface that external services should use
"""
def __init__(self):
self.factory = TrafficAPIClientFactory()
self.client_cache = {} # Cache clients for performance
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current traffic data for any supported location"""
try:
client = self._get_client_for_location(latitude, longitude)
if client:
return await client.get_current_traffic(latitude, longitude)
else:
logger.warning("No traffic data available for location",
lat=latitude, lon=longitude)
return None
except Exception as e:
logger.error("Error getting current traffic",
lat=latitude, lon=longitude, error=str(e))
return None
async def get_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical traffic data for any supported location"""
try:
client = self._get_client_for_location(latitude, longitude)
if client:
return await client.get_historical_traffic(latitude, longitude, start_date, end_date)
else:
logger.warning("No historical traffic data available for location",
lat=latitude, lon=longitude)
return []
except Exception as e:
logger.error("Error getting historical traffic",
lat=latitude, lon=longitude, error=str(e))
return []
async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic events for any supported location"""
try:
client = self._get_client_for_location(latitude, longitude)
if client:
return await client.get_events(latitude, longitude, radius_km)
else:
return []
except Exception as e:
logger.error("Error getting traffic events",
lat=latitude, lon=longitude, error=str(e))
return []
def _get_client_for_location(self, latitude: float, longitude: float) -> Optional[BaseTrafficClient]:
"""Get cached or create new client for location"""
cache_key = f"{latitude:.4f},{longitude:.4f}"
if cache_key not in self.client_cache:
client = self.factory.get_client_for_location(latitude, longitude)
self.client_cache[cache_key] = client
return self.client_cache[cache_key]
def get_location_info(self, latitude: float, longitude: float) -> Dict[str, Any]:
"""Get information about traffic data availability for location"""
client = self._get_client_for_location(latitude, longitude)
if client:
return {
"supported": True,
"city": client.city.value,
"features": ["current_traffic", "historical_traffic", "events"]
}
else:
return {
"supported": False,
"city": None,
"features": [],
"message": "No traffic data available for this location"
}

View File

@@ -0,0 +1,139 @@
# ================================================================
# services/data/app/external/base_client.py
# ================================================================
"""Base HTTP client for external APIs - Enhanced for AEMET"""
import httpx
from typing import Dict, Any, Optional
import structlog
from datetime import datetime
logger = structlog.get_logger()
class BaseAPIClient:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.timeout = httpx.Timeout(30.0)
async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""Make GET request"""
try:
url = f"{self.base_url}{endpoint}"
# Add API key to params for AEMET (not headers)
request_params = params or {}
if self.api_key:
request_params["api_key"] = self.api_key
# Add headers if provided
request_headers = headers or {}
logger.debug("Making API request", url=url, params=request_params)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, params=request_params, headers=request_headers)
response.raise_for_status()
# Log response for debugging
response_data = response.json()
logger.debug("API response received",
status_code=response.status_code,
response_keys=list(response_data.keys()) if isinstance(response_data, dict) else "non-dict")
return response_data
except httpx.HTTPStatusError as e:
logger.error("HTTP error", status_code=e.response.status_code, url=url, response_text=e.response.text[:200])
return None
except httpx.RequestError as e:
logger.error("Request error", error=str(e), url=url)
return None
except Exception as e:
logger.error("Unexpected error", error=str(e), url=url)
return None
async def _fetch_url_directly(self, url: str, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""Fetch data directly from a full URL (for AEMET datos URLs)"""
try:
request_headers = headers or {}
logger.debug("Making direct URL request", url=url)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, headers=request_headers)
response.raise_for_status()
# Handle encoding issues common with Spanish data sources
try:
response_data = response.json()
except UnicodeDecodeError:
logger.warning("UTF-8 decode failed, trying alternative encodings", url=url)
# Try common Spanish encodings
for encoding in ['latin-1', 'windows-1252', 'iso-8859-1']:
try:
text_content = response.content.decode(encoding)
import json
response_data = json.loads(text_content)
logger.info("Successfully decoded with encoding", encoding=encoding)
break
except (UnicodeDecodeError, json.JSONDecodeError):
continue
else:
logger.error("Failed to decode response with any encoding", url=url)
return None
logger.debug("Direct URL response received",
status_code=response.status_code,
data_type=type(response_data),
data_length=len(response_data) if isinstance(response_data, (list, dict)) else "unknown")
return response_data
except httpx.HTTPStatusError as e:
logger.error("HTTP error in direct fetch", status_code=e.response.status_code, url=url)
return None
except httpx.RequestError as e:
logger.error("Request error in direct fetch", error=str(e), url=url)
return None
except Exception as e:
logger.error("Unexpected error in direct fetch", error=str(e), url=url)
return None
async def _post(self, endpoint: str, data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""Make POST request"""
try:
url = f"{self.base_url}{endpoint}"
request_headers = headers or {}
if self.api_key:
request_headers["Authorization"] = f"Bearer {self.api_key}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=data, headers=request_headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error("HTTP error", status_code=e.response.status_code, url=url)
return None
except httpx.RequestError as e:
logger.error("Request error", error=str(e), url=url)
return None
except Exception as e:
logger.error("Unexpected error", error=str(e), url=url)
return None
async def get_direct(self, url: str, headers: Optional[Dict] = None, timeout: Optional[int] = None) -> httpx.Response:
"""
Public GET method for direct HTTP requests
Returns the raw httpx Response object for maximum flexibility
"""
request_headers = headers or {}
request_timeout = httpx.Timeout(timeout if timeout else 30.0)
async with httpx.AsyncClient(timeout=request_timeout, follow_redirects=True) as client:
response = await client.get(url, headers=request_headers)
response.raise_for_status()
return response

View File

@@ -0,0 +1,12 @@
# ================================================================
# services/data/app/external/clients/__init__.py
# ================================================================
"""
HTTP clients package
"""
from .madrid_client import MadridTrafficAPIClient
__all__ = [
'MadridTrafficAPIClient'
]

View File

@@ -0,0 +1,159 @@
# ================================================================
# services/data/app/external/clients/madrid_client.py
# ================================================================
"""
Pure HTTP client for Madrid traffic APIs
Handles only HTTP communication and response decoding
"""
import httpx
import structlog
from datetime import datetime
from typing import Optional, Dict, Any
from ..base_client import BaseAPIClient
class MadridTrafficAPIClient(BaseAPIClient):
"""Pure HTTP client for Madrid traffic APIs"""
TRAFFIC_ENDPOINT = "https://informo.madrid.es/informo/tmadrid/pm.xml"
MEASUREMENT_POINTS_URL = "https://datos.madrid.es/egob/catalogo/202468-263-intensidad-trafico.csv"
def __init__(self):
super().__init__(base_url="https://datos.madrid.es")
self.logger = structlog.get_logger()
def _decode_response_content(self, response) -> Optional[str]:
"""Decode response content with multiple encoding attempts"""
try:
return response.text
except UnicodeDecodeError:
# Try manual encoding for Spanish content
for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
try:
content = response.content.decode(encoding)
if content and len(content) > 100:
self.logger.debug("Successfully decoded with encoding", encoding=encoding)
return content
except UnicodeDecodeError:
continue
return None
def _build_historical_url(self, year: int, month: int) -> str:
"""Build historical ZIP URL for given year and month"""
# Madrid historical data URL pattern
base_url = "https://datos.madrid.es/egob/catalogo/208627"
# URL numbering pattern (this may need adjustment based on actual URLs)
# Note: Historical data is only available for past periods, not current/future
if year == 2023:
url_number = 116 + (month - 1) # 116-127 for 2023
elif year == 2024:
url_number = 128 + (month - 1) # 128-139 for 2024
elif year == 2025:
# For 2025, use the continuing numbering from 2024
url_number = 140 + (month - 1) # Starting from 140 for January 2025
else:
url_number = 116 # Fallback to 2023 data
return f"{base_url}-{url_number}-transporte-ptomedida-historico.zip"
async def fetch_current_traffic_xml(self, endpoint: Optional[str] = None) -> Optional[str]:
"""Fetch current traffic XML data"""
endpoint = endpoint or self.TRAFFIC_ENDPOINT
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/xml,text/xml,*/*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Referer': 'https://datos.madrid.es/'
}
response = await self.get_direct(endpoint, headers=headers, timeout=30)
if not response or response.status_code != 200:
self.logger.warning("Failed to fetch XML data",
endpoint=endpoint,
status=response.status_code if response else None)
return None
# Get XML content with encoding handling
xml_content = self._decode_response_content(response)
if not xml_content:
self.logger.debug("No XML content received", endpoint=endpoint)
return None
self.logger.debug("Madrid XML content fetched",
length=len(xml_content),
endpoint=endpoint)
return xml_content
except Exception as e:
self.logger.error("Error fetching traffic XML data",
endpoint=endpoint,
error=str(e))
return None
async def fetch_measurement_points_csv(self, url: Optional[str] = None) -> Optional[str]:
"""Fetch measurement points CSV data"""
url = url or self.MEASUREMENT_POINTS_URL
try:
async with httpx.AsyncClient(
timeout=30.0,
headers={
'User-Agent': 'MadridTrafficClient/2.0',
'Accept': 'text/csv,application/csv,*/*'
},
follow_redirects=True
) as client:
self.logger.debug("Fetching measurement points registry", url=url)
response = await client.get(url)
if response.status_code == 200:
return response.text
else:
self.logger.warning("Failed to fetch measurement points",
status=response.status_code, url=url)
return None
except Exception as e:
self.logger.error("Error fetching measurement points registry",
url=url, error=str(e))
return None
async def fetch_historical_zip(self, zip_url: str) -> Optional[bytes]:
"""Fetch historical traffic ZIP file"""
try:
async with httpx.AsyncClient(
timeout=120.0, # Longer timeout for large files
headers={
'User-Agent': 'MadridTrafficClient/2.0',
'Accept': 'application/zip,*/*'
},
follow_redirects=True
) as client:
self.logger.debug("Fetching historical ZIP", url=zip_url)
response = await client.get(zip_url)
if response.status_code == 200:
self.logger.debug("Historical ZIP fetched",
url=zip_url,
size=len(response.content))
return response.content
else:
self.logger.warning("Failed to fetch historical ZIP",
status=response.status_code, url=zip_url)
return None
except Exception as e:
self.logger.error("Error fetching historical ZIP",
url=zip_url, error=str(e))
return None

View File

@@ -0,0 +1,20 @@
# ================================================================
# services/data/app/external/models/__init__.py
# ================================================================
"""
Madrid traffic models package
"""
from .madrid_models import (
TrafficServiceLevel,
CongestionLevel,
MeasurementPoint,
TrafficRecord
)
__all__ = [
'TrafficServiceLevel',
'CongestionLevel',
'MeasurementPoint',
'TrafficRecord'
]

View File

@@ -0,0 +1,66 @@
# ================================================================
# services/data/app/external/models/madrid_models.py
# ================================================================
"""
Data structures, enums, and dataclasses for Madrid traffic system
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
class TrafficServiceLevel(Enum):
"""Madrid traffic service levels"""
FLUID = 0
DENSE = 1
CONGESTED = 2
BLOCKED = 3
class CongestionLevel(Enum):
"""Standardized congestion levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
BLOCKED = "blocked"
@dataclass
class MeasurementPoint:
"""Madrid measurement point data structure"""
id: str
latitude: float
longitude: float
distance: float
name: str
type: str
@dataclass
class TrafficRecord:
"""Standardized traffic record with pedestrian inference"""
date: datetime
traffic_volume: int
occupation_percentage: int
load_percentage: int
average_speed: int
congestion_level: str
pedestrian_count: int
measurement_point_id: str
measurement_point_name: str
road_type: str
source: str
district: Optional[str] = None
# Madrid-specific data
intensidad_raw: Optional[int] = None
ocupacion_raw: Optional[int] = None
carga_raw: Optional[int] = None
vmed_raw: Optional[int] = None
# Pedestrian inference metadata
pedestrian_multiplier: Optional[float] = None
time_pattern_factor: Optional[float] = None
district_factor: Optional[float] = None

View File

@@ -0,0 +1,14 @@
# ================================================================
# services/data/app/external/processors/__init__.py
# ================================================================
"""
Data processors package
"""
from .madrid_processor import MadridTrafficDataProcessor
from .madrid_business_logic import MadridTrafficAnalyzer
__all__ = [
'MadridTrafficDataProcessor',
'MadridTrafficAnalyzer'
]

View File

@@ -0,0 +1,346 @@
# ================================================================
# services/data/app/external/processors/madrid_business_logic.py
# ================================================================
"""
Business rules, inference, and domain logic for Madrid traffic data
Handles pedestrian inference, district mapping, road classification, and validation
"""
import math
import re
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
import structlog
from ..models.madrid_models import TrafficRecord, CongestionLevel
class MadridTrafficAnalyzer:
"""Handles business logic for Madrid traffic analysis"""
# Madrid district characteristics for pedestrian patterns
DISTRICT_MULTIPLIERS = {
'Centro': 2.5, # Historic center, high pedestrian activity
'Salamanca': 2.0, # Shopping area, high foot traffic
'Chamberí': 1.8, # Business district
'Retiro': 2.2, # Near park, high leisure activity
'Chamartín': 1.6, # Business/residential
'Tetuán': 1.4, # Mixed residential/commercial
'Fuencarral': 1.3, # Residential with commercial areas
'Moncloa': 1.7, # University area
'Latina': 1.5, # Residential area
'Carabanchel': 1.2, # Residential periphery
'Usera': 1.1, # Industrial/residential
'Villaverde': 1.0, # Industrial area
'Villa de Vallecas': 1.0, # Peripheral residential
'Vicálvaro': 0.9, # Peripheral
'San Blas': 1.1, # Residential
'Barajas': 0.8, # Airport area, low pedestrian activity
'Hortaleza': 1.2, # Mixed area
'Ciudad Lineal': 1.3, # Linear development
'Puente de Vallecas': 1.2, # Working class area
'Moratalaz': 1.1, # Residential
'Arganzuela': 1.6, # Near center, growing area
}
# Time-based patterns (hour of day)
TIME_PATTERNS = {
'morning_peak': {'hours': [7, 8, 9], 'multiplier': 2.0},
'lunch_peak': {'hours': [12, 13, 14], 'multiplier': 2.5},
'evening_peak': {'hours': [18, 19, 20], 'multiplier': 2.2},
'afternoon': {'hours': [15, 16, 17], 'multiplier': 1.8},
'late_evening': {'hours': [21, 22], 'multiplier': 1.5},
'night': {'hours': [23, 0, 1, 2, 3, 4, 5, 6], 'multiplier': 0.3},
'morning': {'hours': [10, 11], 'multiplier': 1.4}
}
# Road type specific patterns
ROAD_TYPE_BASE = {
'URB': 250, # Urban streets - high pedestrian activity
'M30': 50, # Ring road - minimal pedestrians
'C30': 75, # Secondary ring - some pedestrian access
'A': 25, # Highways - very low pedestrians
'R': 40 # Radial roads - low to moderate
}
# Weather impact on pedestrian activity
WEATHER_IMPACT = {
'rain': 0.6, # 40% reduction in rain
'hot_weather': 0.8, # 20% reduction when very hot
'cold_weather': 0.7, # 30% reduction when very cold
'normal': 1.0 # No impact
}
def __init__(self):
self.logger = structlog.get_logger()
def calculate_pedestrian_flow(
self,
traffic_record: TrafficRecord,
location_context: Optional[Dict[str, Any]] = None
) -> Tuple[int, Dict[str, float]]:
"""
Calculate pedestrian flow estimate with detailed metadata
Returns:
Tuple of (pedestrian_count, inference_metadata)
"""
# Base calculation from road type
road_type = traffic_record.road_type or 'URB'
base_pedestrians = self.ROAD_TYPE_BASE.get(road_type, 200)
# Time pattern adjustment
hour = traffic_record.date.hour
time_factor = self._get_time_pattern_factor(hour)
# District adjustment (if available)
district_factor = 1.0
district = traffic_record.district or self.infer_district_from_location(location_context)
if district:
district_factor = self.DISTRICT_MULTIPLIERS.get(district, 1.0)
# Traffic correlation adjustment
traffic_factor = self._calculate_traffic_correlation(traffic_record)
# Weather adjustment (if data available)
weather_factor = self._get_weather_factor(traffic_record.date, location_context)
# Weekend adjustment
weekend_factor = self._get_weekend_factor(traffic_record.date)
# Combined calculation
pedestrian_count = int(
base_pedestrians *
time_factor *
district_factor *
traffic_factor *
weather_factor *
weekend_factor
)
# Ensure reasonable bounds
pedestrian_count = max(10, min(2000, pedestrian_count))
# Metadata for model training
inference_metadata = {
'base_pedestrians': base_pedestrians,
'time_factor': time_factor,
'district_factor': district_factor,
'traffic_factor': traffic_factor,
'weather_factor': weather_factor,
'weekend_factor': weekend_factor,
'inferred_district': district,
'hour': hour,
'road_type': road_type
}
return pedestrian_count, inference_metadata
def _get_time_pattern_factor(self, hour: int) -> float:
"""Get time-based pedestrian activity multiplier"""
for pattern, config in self.TIME_PATTERNS.items():
if hour in config['hours']:
return config['multiplier']
return 1.0 # Default multiplier
def _calculate_traffic_correlation(self, traffic_record: TrafficRecord) -> float:
"""
Calculate pedestrian correlation with traffic patterns
Higher traffic in urban areas often correlates with more pedestrians
"""
if traffic_record.road_type == 'URB':
# Urban areas: moderate traffic indicates commercial activity
if 30 <= traffic_record.load_percentage <= 70:
return 1.3 # Sweet spot for pedestrian activity
elif traffic_record.load_percentage > 70:
return 0.9 # Too congested, pedestrians avoid
else:
return 1.0 # Normal correlation
else:
# Highway/ring roads: more traffic = fewer pedestrians
if traffic_record.load_percentage > 60:
return 0.5
else:
return 0.8
def _get_weather_factor(self, date: datetime, location_context: Optional[Dict] = None) -> float:
"""Estimate weather impact on pedestrian activity"""
# Simplified weather inference based on season and typical Madrid patterns
month = date.month
# Madrid seasonal patterns
if month in [12, 1, 2]: # Winter - cold weather impact
return self.WEATHER_IMPACT['cold_weather']
elif month in [7, 8]: # Summer - hot weather impact
return self.WEATHER_IMPACT['hot_weather']
elif month in [10, 11, 3, 4]: # Rainy seasons - moderate impact
return 0.85
else: # Spring/early summer - optimal weather
return 1.1
def _get_weekend_factor(self, date: datetime) -> float:
"""Weekend vs weekday pedestrian patterns"""
weekday = date.weekday()
hour = date.hour
if weekday >= 5: # Weekend
if 11 <= hour <= 16: # Weekend shopping/leisure hours
return 1.4
elif 20 <= hour <= 23: # Weekend evening activity
return 1.3
else:
return 0.9
else: # Weekday
return 1.0
def infer_district_from_location(self, location_context: Optional[Dict] = None) -> Optional[str]:
"""
Infer Madrid district from location context or coordinates
"""
if not location_context:
return None
lat = location_context.get('latitude')
lon = location_context.get('longitude')
if not (lat and lon):
return None
# Madrid district boundaries (simplified boundaries for inference)
districts = {
# Central districts
'Centro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.720, 'lon_max': -3.690},
'Arganzuela': {'lat_min': 40.385, 'lat_max': 40.410, 'lon_min': -3.720, 'lon_max': -3.680},
'Retiro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.690, 'lon_max': -3.660},
'Salamanca': {'lat_min': 40.420, 'lat_max': 40.445, 'lon_min': -3.690, 'lon_max': -3.660},
'Chamartín': {'lat_min': 40.445, 'lat_max': 40.480, 'lon_min': -3.690, 'lon_max': -3.660},
'Tetuán': {'lat_min': 40.445, 'lat_max': 40.470, 'lon_min': -3.720, 'lon_max': -3.690},
'Chamberí': {'lat_min': 40.425, 'lat_max': 40.450, 'lon_min': -3.720, 'lon_max': -3.690},
'Fuencarral-El Pardo': {'lat_min': 40.470, 'lat_max': 40.540, 'lon_min': -3.750, 'lon_max': -3.650},
'Moncloa-Aravaca': {'lat_min': 40.430, 'lat_max': 40.480, 'lon_min': -3.750, 'lon_max': -3.720},
'Latina': {'lat_min': 40.380, 'lat_max': 40.420, 'lon_min': -3.750, 'lon_max': -3.720},
'Carabanchel': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.750, 'lon_max': -3.720},
'Usera': {'lat_min': 40.350, 'lat_max': 40.385, 'lon_min': -3.720, 'lon_max': -3.690},
'Puente de Vallecas': {'lat_min': 40.370, 'lat_max': 40.410, 'lon_min': -3.680, 'lon_max': -3.640},
'Moratalaz': {'lat_min': 40.400, 'lat_max': 40.430, 'lon_min': -3.650, 'lon_max': -3.620},
'Ciudad Lineal': {'lat_min': 40.430, 'lat_max': 40.460, 'lon_min': -3.650, 'lon_max': -3.620},
'Hortaleza': {'lat_min': 40.460, 'lat_max': 40.500, 'lon_min': -3.650, 'lon_max': -3.620},
'Villaverde': {'lat_min': 40.320, 'lat_max': 40.360, 'lon_min': -3.720, 'lon_max': -3.680},
}
# Find matching district
for district_name, bounds in districts.items():
if (bounds['lat_min'] <= lat <= bounds['lat_max'] and
bounds['lon_min'] <= lon <= bounds['lon_max']):
return district_name
# Default for coordinates in Madrid but not matching specific districts
if 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5:
return 'Other Madrid'
return None
def classify_road_type(self, measurement_point_name: str) -> str:
"""Classify road type based on measurement point name"""
if not measurement_point_name:
return 'URB' # Default to urban
name_upper = measurement_point_name.upper()
# Highway patterns
if any(pattern in name_upper for pattern in ['A-', 'AP-', 'AUTOPISTA', 'AUTOVIA']):
return 'A'
# M-30 Ring road
if 'M-30' in name_upper or 'M30' in name_upper:
return 'M30'
# Other M roads (ring roads)
if re.search(r'M-[0-9]', name_upper) or re.search(r'M[0-9]', name_upper):
return 'C30'
# Radial roads (R-1, R-2, etc.)
if re.search(r'R-[0-9]', name_upper) or 'RADIAL' in name_upper:
return 'R'
# Default to urban street
return 'URB'
def validate_madrid_coordinates(self, lat: float, lon: float) -> bool:
"""Validate coordinates are within Madrid bounds"""
# Madrid metropolitan area bounds
return 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5
def get_congestion_level(self, occupation_pct: float) -> str:
"""Convert occupation percentage to congestion level"""
if occupation_pct >= 80:
return CongestionLevel.BLOCKED.value
elif occupation_pct >= 50:
return CongestionLevel.HIGH.value
elif occupation_pct >= 25:
return CongestionLevel.MEDIUM.value
else:
return CongestionLevel.LOW.value
def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points in kilometers using Haversine formula"""
R = 6371 # Earth's radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
def find_nearest_traffic_point(self, traffic_points: List[Dict[str, Any]],
latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Find the nearest traffic point to given coordinates"""
if not traffic_points:
return None
min_distance = float('inf')
nearest_point = None
for point in traffic_points:
point_lat = point.get('latitude')
point_lon = point.get('longitude')
if point_lat and point_lon:
distance = self.calculate_distance(latitude, longitude, point_lat, point_lon)
if distance < min_distance:
min_distance = distance
nearest_point = point
return nearest_point
def find_nearest_measurement_points(self, measurement_points: Dict[str, Dict[str, Any]],
latitude: float, longitude: float,
num_points: int = 3, max_distance_km: Optional[float] = 5.0) -> List[Tuple[str, Dict[str, Any], float]]:
"""Find nearest measurement points for historical data"""
distances = []
for point_id, point_data in measurement_points.items():
point_lat = point_data.get('latitude')
point_lon = point_data.get('longitude')
if point_lat and point_lon:
distance_km = self.calculate_distance(latitude, longitude, point_lat, point_lon)
distances.append((point_id, point_data, distance_km))
# Sort by distance and take nearest points
distances.sort(key=lambda x: x[2])
# Apply distance filter if specified
if max_distance_km is not None:
distances = [p for p in distances if p[2] <= max_distance_km]
nearest = distances[:num_points]
self.logger.info("Found nearest measurement points",
count=len(nearest),
nearest_distance_km=nearest[0][2] if nearest else None)
return nearest

View File

@@ -0,0 +1,478 @@
# ================================================================
# services/data/app/external/processors/madrid_processor.py
# ================================================================
"""
Data transformation and parsing for Madrid traffic data
Handles XML parsing, CSV processing, coordinate conversion, and data quality scoring
"""
import csv
import io
import math
import re
import xml.etree.ElementTree as ET
import zipfile
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
import structlog
import pyproj
from ..models.madrid_models import TrafficRecord, MeasurementPoint, CongestionLevel
class MadridTrafficDataProcessor:
"""Handles all data transformation and parsing for Madrid traffic data"""
def __init__(self):
self.logger = structlog.get_logger()
# UTM Zone 30N (Madrid's coordinate system)
self.utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', datum='WGS84')
self.wgs84_proj = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84')
def safe_int(self, value: str) -> int:
"""Safely convert string to int"""
try:
return int(float(value.replace(',', '.')))
except (ValueError, TypeError):
return 0
def _safe_float(self, value: str) -> float:
"""Safely convert string to float"""
try:
return float(value.replace(',', '.'))
except (ValueError, TypeError):
return 0.0
def clean_madrid_xml(self, xml_content: str) -> str:
"""Clean and prepare Madrid XML content for parsing"""
if not xml_content:
return ""
# Remove BOM and extra whitespace
cleaned = xml_content.strip()
if cleaned.startswith('\ufeff'):
cleaned = cleaned[1:]
# Fix common XML issues
cleaned = re.sub(r'&(?!amp;|lt;|gt;|quot;|apos;)', '&amp;', cleaned)
# Ensure proper encoding declaration
if not cleaned.startswith('<?xml'):
cleaned = '<?xml version="1.0" encoding="UTF-8"?>\n' + cleaned
return cleaned
def convert_utm_to_latlon(self, utm_x: str, utm_y: str) -> Tuple[Optional[float], Optional[float]]:
"""Convert UTM coordinates to latitude/longitude"""
try:
utm_x_float = float(utm_x.replace(',', '.'))
utm_y_float = float(utm_y.replace(',', '.'))
# Convert from UTM Zone 30N to WGS84
longitude, latitude = pyproj.transform(self.utm_proj, self.wgs84_proj, utm_x_float, utm_y_float)
# Validate coordinates are in Madrid area
if 40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5:
return latitude, longitude
else:
self.logger.debug("Coordinates outside Madrid bounds",
lat=latitude, lon=longitude, utm_x=utm_x, utm_y=utm_y)
return None, None
except Exception as e:
self.logger.debug("UTM conversion error",
utm_x=utm_x, utm_y=utm_y, error=str(e))
return None, None
def parse_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]:
"""Parse Madrid traffic XML data"""
traffic_points = []
try:
cleaned_xml = self.clean_madrid_xml(xml_content)
root = ET.fromstring(cleaned_xml)
self.logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root)))
if root.tag == 'pms':
pm_elements = root.findall('pm')
self.logger.debug("Found PM elements", count=len(pm_elements))
for pm in pm_elements:
try:
traffic_point = self._extract_madrid_pm_element(pm)
if self._is_valid_traffic_point(traffic_point):
traffic_points.append(traffic_point)
# Log first few points for debugging
if len(traffic_points) <= 3:
self.logger.debug("Sample traffic point",
id=traffic_point['idelem'],
lat=traffic_point['latitude'],
lon=traffic_point['longitude'],
intensity=traffic_point.get('intensidad'))
except Exception as e:
self.logger.debug("Error parsing PM element", error=str(e))
continue
else:
self.logger.warning("Unexpected XML root tag", root_tag=root.tag)
self.logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points))
return traffic_points
except ET.ParseError as e:
self.logger.warning("Failed to parse Madrid XML", error=str(e))
return self._extract_traffic_data_regex(xml_content)
except Exception as e:
self.logger.error("Error in Madrid traffic XML parsing", error=str(e))
return []
def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]:
"""Extract traffic data from Madrid <pm> element with coordinate conversion"""
try:
point_data = {}
utm_x = utm_y = None
# Extract all child elements
for child in pm_element:
tag, text = child.tag, child.text.strip() if child.text else ''
if tag == 'idelem':
point_data['idelem'] = text
elif tag == 'descripcion':
point_data['descripcion'] = text
elif tag == 'intensidad':
point_data['intensidad'] = self.safe_int(text)
elif tag == 'ocupacion':
point_data['ocupacion'] = self._safe_float(text)
elif tag == 'carga':
point_data['carga'] = self.safe_int(text)
elif tag == 'nivelServicio':
point_data['nivelServicio'] = self.safe_int(text)
elif tag == 'st_x': # UTM X coordinate
utm_x = text
point_data['utm_x'] = text
elif tag == 'st_y': # UTM Y coordinate
utm_y = text
point_data['utm_y'] = text
elif tag == 'error':
point_data['error'] = text
elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']:
point_data[tag] = text
# Convert coordinates
if utm_x and utm_y:
latitude, longitude = self.convert_utm_to_latlon(utm_x, utm_y)
if latitude and longitude:
point_data.update({
'latitude': latitude,
'longitude': longitude,
'measurement_point_id': point_data.get('idelem'),
'measurement_point_name': point_data.get('descripcion'),
'timestamp': datetime.now(timezone.utc),
'source': 'madrid_opendata_xml'
})
return point_data
else:
self.logger.debug("Invalid coordinates after conversion",
idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y)
return {}
else:
self.logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem'))
return {}
except Exception as e:
self.logger.debug("Error extracting PM element", error=str(e))
return {}
def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool:
"""Validate traffic point data"""
required_fields = ['idelem', 'latitude', 'longitude']
return all(field in traffic_point and traffic_point[field] for field in required_fields)
def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]:
"""Fallback regex-based extraction if XML parsing fails"""
traffic_points = []
try:
# Pattern to match PM elements
pm_pattern = r'<pm>(.*?)</pm>'
pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL)
for pm_content in pm_matches:
traffic_point = {}
# Extract key fields
patterns = {
'idelem': r'<idelem>(.*?)</idelem>',
'descripcion': r'<descripcion>(.*?)</descripcion>',
'intensidad': r'<intensidad>(.*?)</intensidad>',
'ocupacion': r'<ocupacion>(.*?)</ocupacion>',
'st_x': r'<st_x>(.*?)</st_x>',
'st_y': r'<st_y>(.*?)</st_y>'
}
for field, pattern in patterns.items():
match = re.search(pattern, pm_content)
if match:
traffic_point[field] = match.group(1).strip()
# Convert coordinates
if 'st_x' in traffic_point and 'st_y' in traffic_point:
latitude, longitude = self.convert_utm_to_latlon(
traffic_point['st_x'], traffic_point['st_y']
)
if latitude and longitude:
traffic_point.update({
'latitude': latitude,
'longitude': longitude,
'intensidad': self.safe_int(traffic_point.get('intensidad', '0')),
'ocupacion': self._safe_float(traffic_point.get('ocupacion', '0')),
'measurement_point_id': traffic_point.get('idelem'),
'measurement_point_name': traffic_point.get('descripcion'),
'timestamp': datetime.now(timezone.utc),
'source': 'madrid_opendata_xml_regex'
})
traffic_points.append(traffic_point)
self.logger.debug("Regex extraction completed", points=len(traffic_points))
return traffic_points
except Exception as e:
self.logger.error("Error in regex extraction", error=str(e))
return []
def parse_measurement_points_csv(self, csv_content: str) -> Dict[str, Dict[str, Any]]:
"""Parse measurement points CSV into lookup dictionary"""
measurement_points = {}
try:
# Parse CSV with semicolon delimiter
csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=';')
processed_count = 0
for row in csv_reader:
try:
# Extract point ID and coordinates
point_id = row.get('id', '').strip()
if not point_id:
continue
processed_count += 1
# Try different coordinate field names
lat_str = ''
lon_str = ''
# Common coordinate field patterns
lat_fields = ['lat', 'latitude', 'latitud', 'y', 'utm_y']
lon_fields = ['lon', 'lng', 'longitude', 'longitud', 'x', 'utm_x']
for field in lat_fields:
if field in row and row[field].strip():
lat_str = row[field].strip()
break
for field in lon_fields:
if field in row and row[field].strip():
lon_str = row[field].strip()
break
if lat_str and lon_str:
try:
# Try direct lat/lon first
latitude = self._safe_float(lat_str)
longitude = self._safe_float(lon_str)
# If values look like UTM coordinates, convert them
if latitude > 1000 or longitude > 1000:
latitude, longitude = self.convert_utm_to_latlon(lon_str, lat_str)
if not latitude or not longitude:
continue
# Validate Madrid area
if not (40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5):
continue
measurement_points[point_id] = {
'id': point_id,
'latitude': latitude,
'longitude': longitude,
'name': row.get('nombre', row.get('descripcion', f"Point {point_id}")),
'type': row.get('tipo', 'traffic'),
'raw_data': dict(row) # Keep original data
}
except Exception as e:
self.logger.debug("Error processing point coordinates",
point_id=point_id, error=str(e))
continue
except Exception as e:
self.logger.debug("Error processing CSV row", error=str(e))
continue
self.logger.info("Parsed measurement points registry",
total_points=len(measurement_points))
return measurement_points
except Exception as e:
self.logger.error("Error parsing measurement points CSV", error=str(e))
return {}
def calculate_data_quality_score(self, row: Dict[str, str]) -> float:
"""Calculate data quality score for a traffic record"""
try:
score = 1.0
# Check for missing or invalid values
intensidad = row.get('intensidad', '').strip()
if not intensidad or intensidad in ['N', '', '0']:
score *= 0.7
ocupacion = row.get('ocupacion', '').strip()
if not ocupacion or ocupacion in ['N', '', '0']:
score *= 0.8
error_status = row.get('error', '').strip()
if error_status and error_status != 'N':
score *= 0.6
# Check for reasonable value ranges
try:
intensidad_val = self.safe_int(intensidad)
if intensidad_val < 0 or intensidad_val > 5000: # Unrealistic traffic volume
score *= 0.7
ocupacion_val = self.safe_int(ocupacion)
if ocupacion_val < 0 or ocupacion_val > 100: # Invalid percentage
score *= 0.5
except:
score *= 0.6
return max(0.1, score) # Minimum quality score
except Exception as e:
self.logger.debug("Error calculating quality score", error=str(e))
return 0.5 # Default medium quality
async def process_csv_content_chunked(self, text_content: str, csv_filename: str,
nearest_ids: set, nearest_points: list) -> list:
"""Process CSV content in chunks to prevent memory issues"""
import csv
import io
import gc
try:
csv_reader = csv.DictReader(io.StringIO(text_content), delimiter=';')
chunk_size = 10000
chunk_records = []
all_records = []
processed_count = 0
total_rows_seen = 0
for row in csv_reader:
total_rows_seen += 1
measurement_point_id = row.get('id', '').strip()
if measurement_point_id not in nearest_ids:
continue
try:
record_data = await self.parse_historical_csv_row(row, nearest_points)
if record_data:
chunk_records.append(record_data)
processed_count += 1
if len(chunk_records) >= chunk_size:
all_records.extend(chunk_records)
chunk_records = []
gc.collect()
except Exception as e:
if processed_count < 5:
self.logger.error("Row parsing exception",
row_num=total_rows_seen,
measurement_point_id=measurement_point_id,
error=str(e))
continue
# Process remaining records
if chunk_records:
all_records.extend(chunk_records)
chunk_records = []
gc.collect()
self.logger.info("Processed CSV file",
filename=csv_filename,
total_rows_read=total_rows_seen,
processed_records=processed_count)
return all_records
except Exception as e:
self.logger.error("Error processing CSV content",
filename=csv_filename, error=str(e))
return []
async def parse_historical_csv_row(self, row: dict, nearest_points: list) -> dict:
"""Parse a single row from Madrid's historical traffic CSV"""
try:
# Extract date
fecha_str = row.get('fecha', '').strip()
if not fecha_str:
return None
try:
from datetime import datetime, timezone
date_obj = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S')
date_obj = date_obj.replace(tzinfo=timezone.utc)
except Exception:
return None
measurement_point_id = row.get('id', '').strip()
# Find point data
point_match = next((p for p in nearest_points if p[0] == measurement_point_id), None)
if not point_match:
return None
point_data = point_match[1]
distance_km = point_match[2]
# Extract traffic data
intensidad = self.safe_int(row.get('intensidad', '0'))
ocupacion = self.safe_int(row.get('ocupacion', '0'))
carga = self.safe_int(row.get('carga', '0'))
vmed = self.safe_int(row.get('vmed', '0'))
# Build basic result (business logic will be applied elsewhere)
result = {
'date': date_obj,
'measurement_point_id': measurement_point_id,
'point_data': point_data,
'distance_km': distance_km,
'traffic_data': {
'intensidad': intensidad,
'ocupacion': ocupacion,
'carga': carga,
'vmed': vmed
},
'data_quality_score': self.calculate_data_quality_score(row),
'raw_row': row
}
return result
except Exception as e:
self.logger.debug("Error parsing historical CSV row", error=str(e))
return None