Files
bakery-ia/services/external/app/external/apis/madrid_traffic_client.py

411 lines
18 KiB
Python

# ================================================================
# services/data/app/external/apis/madrid_traffic_client.py
# ================================================================
"""
Madrid traffic client - Orchestration layer only
Coordinates between HTTP client, data processor, and business logic components
"""
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Any, Optional, Tuple
import structlog
from .traffic import BaseTrafficClient, SupportedCity
from ..base_client import BaseAPIClient
from ..clients.madrid_client import MadridTrafficAPIClient
from ..processors.madrid_processor import MadridTrafficDataProcessor
from ..processors.madrid_business_logic import MadridTrafficAnalyzer
from ..models.madrid_models import TrafficRecord, CongestionLevel
class MadridTrafficClient(BaseTrafficClient, BaseAPIClient):
"""
Enhanced Madrid traffic client - Orchestration layer
Coordinates HTTP, processing, and business logic components
"""
# Madrid geographic bounds
MADRID_BOUNDS = {
'lat_min': 40.31, 'lat_max': 40.56,
'lon_min': -3.89, 'lon_max': -3.51
}
# Configuration constants
MAX_HISTORICAL_DAYS = 1095 # 3 years
MAX_CSV_PROCESSING_ROWS = 5000000
MEASUREMENT_POINTS_LIMIT = 20
def __init__(self):
BaseTrafficClient.__init__(self, SupportedCity.MADRID)
BaseAPIClient.__init__(self, base_url="https://datos.madrid.es")
# Initialize components
self.api_client = MadridTrafficAPIClient()
self.processor = MadridTrafficDataProcessor()
self.analyzer = MadridTrafficAnalyzer()
self.logger = structlog.get_logger()
def supports_location(self, latitude: float, longitude: float) -> bool:
"""Check if location is within Madrid bounds"""
return (self.MADRID_BOUNDS['lat_min'] <= latitude <= self.MADRID_BOUNDS['lat_max'] and
self.MADRID_BOUNDS['lon_min'] <= longitude <= self.MADRID_BOUNDS['lon_max'])
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
"""Get current traffic data with enhanced pedestrian inference"""
try:
if not self.supports_location(latitude, longitude):
self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude)
return None
# Fetch XML data
xml_content = await self.api_client.fetch_current_traffic_xml()
if not xml_content:
self.logger.warning("No XML content received")
return None
# Parse XML data
traffic_points = self.processor.parse_traffic_xml(xml_content)
if not traffic_points:
self.logger.warning("No traffic points found in XML")
return None
# Find nearest traffic point
nearest_point = self.analyzer.find_nearest_traffic_point(traffic_points, latitude, longitude)
if not nearest_point:
self.logger.warning("No nearby traffic points found")
return None
# Enhance with business logic
enhanced_data = await self._enhance_traffic_data(nearest_point, latitude, longitude)
self.logger.info("Current traffic data retrieved",
point_id=nearest_point.get('measurement_point_id'),
distance=enhanced_data.get('distance_km', 0))
return enhanced_data
except Exception as e:
self.logger.error("Error getting current traffic", error=str(e))
return None
async def get_historical_traffic(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get historical traffic data with pedestrian enhancement"""
try:
if not self.supports_location(latitude, longitude):
self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude)
return []
# Validate date range
if (end_date - start_date).days > self.MAX_HISTORICAL_DAYS:
self.logger.warning("Date range too large, truncating",
requested_days=(end_date - start_date).days,
max_days=self.MAX_HISTORICAL_DAYS)
start_date = end_date - timedelta(days=self.MAX_HISTORICAL_DAYS)
# Fetch measurement points registry
csv_content = await self.api_client.fetch_measurement_points_csv()
if not csv_content:
self.logger.error("Failed to fetch measurement points registry")
return []
# Parse measurement points
measurement_points = self.processor.parse_measurement_points_csv(csv_content)
if not measurement_points:
self.logger.error("No measurement points found")
return []
# Find nearest measurement points
nearest_points = self.analyzer.find_nearest_measurement_points(
measurement_points, latitude, longitude, num_points=3
)
if not nearest_points:
self.logger.warning("No nearby measurement points found")
return []
# Process historical data
historical_records = await self._fetch_historical_data_enhanced(
latitude, longitude, start_date, end_date, nearest_points
)
self.logger.info("Historical traffic data retrieved",
records_count=len(historical_records),
date_range=f"{start_date.date()} to {end_date.date()}")
return historical_records
except Exception as e:
self.logger.error("Error getting historical traffic", error=str(e))
return []
async def get_events(self, latitude: float, longitude: float,
radius_km: float = 5.0) -> List[Dict[str, Any]]:
"""Get traffic events (incidents, construction, etc.)"""
# Madrid doesn't provide separate events endpoint
# Return enhanced current traffic data as events
current_data = await self.get_current_traffic(latitude, longitude)
if current_data and current_data.get('congestion_level') in ['high', 'blocked']:
return [{
'type': 'congestion',
'severity': current_data.get('congestion_level'),
'description': f"High traffic congestion at {current_data.get('measurement_point_name', 'measurement point')}",
'location': {
'latitude': current_data.get('latitude'),
'longitude': current_data.get('longitude')
},
'timestamp': current_data.get('timestamp')
}]
return []
async def _enhance_traffic_data(self, traffic_point: Dict[str, Any],
query_lat: float, query_lon: float) -> Dict[str, Any]:
"""Enhance traffic data with business logic and pedestrian inference"""
# Calculate distance
distance_km = self.analyzer.calculate_distance(
query_lat, query_lon,
traffic_point.get('latitude', 0),
traffic_point.get('longitude', 0)
)
# Classify road type
road_type = self.analyzer.classify_road_type(
traffic_point.get('measurement_point_name', '')
)
# Get congestion level
congestion_level = self.analyzer.get_congestion_level(
traffic_point.get('ocupacion', 0)
)
# Create traffic record for pedestrian inference
traffic_record = TrafficRecord(
date=datetime.now(timezone.utc),
traffic_volume=traffic_point.get('intensidad', 0),
occupation_percentage=int(traffic_point.get('ocupacion', 0)),
load_percentage=traffic_point.get('carga', 0),
average_speed=30, # Default speed
congestion_level=congestion_level,
pedestrian_count=0, # Will be calculated
measurement_point_id=traffic_point.get('measurement_point_id', ''),
measurement_point_name=traffic_point.get('measurement_point_name', ''),
road_type=road_type,
source='madrid_current_xml'
)
# Calculate pedestrian count
location_context = {
'latitude': traffic_point.get('latitude'),
'longitude': traffic_point.get('longitude'),
'measurement_point_name': traffic_point.get('measurement_point_name')
}
pedestrian_count, inference_metadata = self.analyzer.calculate_pedestrian_flow(
traffic_record, location_context
)
# Calculate average speed based on congestion level
if congestion_level == 'high':
average_speed = 15.0
elif congestion_level == 'medium':
average_speed = 35.0
elif congestion_level == 'low':
average_speed = 50.0
else:
average_speed = 30.0 # default
# Build enhanced response with required API fields
enhanced_data = {
'date': datetime.now(timezone.utc), # Required API field
'timestamp': datetime.now(timezone.utc),
'latitude': traffic_point.get('latitude'),
'longitude': traffic_point.get('longitude'),
'measurement_point_id': traffic_point.get('measurement_point_id'),
'measurement_point_name': traffic_point.get('measurement_point_name'),
'traffic_volume': traffic_point.get('intensidad', 0),
'pedestrian_count': pedestrian_count,
'congestion_level': congestion_level,
'average_speed': average_speed, # Required API field
'occupation_percentage': int(traffic_point.get('ocupacion', 0)),
'load_percentage': traffic_point.get('carga', 0),
'road_type': road_type,
'distance_km': distance_km,
'source': 'madrid_current_xml',
'city': 'madrid',
'inference_metadata': inference_metadata,
'raw_data': traffic_point
}
return enhanced_data
async def _fetch_historical_data_enhanced(self, latitude: float, longitude: float,
start_date: datetime, end_date: datetime,
nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]:
"""Fetch and process historical traffic data"""
historical_records = []
try:
# Process by year and month to avoid memory issues
current_date = start_date.replace(day=1) # Start from beginning of month
now = datetime.now()
while current_date <= end_date:
year = current_date.year
month = current_date.month
# Skip current month and future months (no historical data available yet)
if (year == now.year and month >= now.month) or year > now.year:
self.logger.info("Skipping current/future month - no historical data available",
year=year, month=month)
current_date = self._next_month(current_date)
continue
# Build historical URL
zip_url = self.api_client._build_historical_url(year, month)
self.logger.info("Processing historical ZIP file",
year=year, month=month, zip_url=zip_url)
# Fetch ZIP content
zip_content = await self.api_client.fetch_historical_zip(zip_url)
if not zip_content:
self.logger.warning("Failed to fetch historical ZIP", url=zip_url)
current_date = self._next_month(current_date)
continue
# Process ZIP content with enhanced parsing
month_records = await self._process_historical_zip_enhanced(
zip_content, zip_url, latitude, longitude, nearest_points
)
# Filter by date range - ensure timezone consistency
# Make sure start_date and end_date have timezone info for comparison
start_tz = start_date if start_date.tzinfo else start_date.replace(tzinfo=timezone.utc)
end_tz = end_date if end_date.tzinfo else end_date.replace(tzinfo=timezone.utc)
filtered_records = []
for record in month_records:
record_date = record.get('date')
if not record_date:
continue
# Ensure record date has timezone info
if not record_date.tzinfo:
record_date = record_date.replace(tzinfo=timezone.utc)
# Now compare with consistent timezone info
if start_tz <= record_date <= end_tz:
filtered_records.append(record)
historical_records.extend(filtered_records)
self.logger.info("Month processing completed",
year=year, month=month,
month_records=len(month_records),
filtered_records=len(filtered_records),
total_records=len(historical_records))
# Move to next month - extracted to helper method
current_date = self._next_month(current_date)
return historical_records
except Exception as e:
self.logger.error("Error fetching historical data", error=str(e))
return historical_records # Return partial results
async def _process_historical_zip_enhanced(self, zip_content: bytes, zip_url: str,
latitude: float, longitude: float,
nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]:
"""Process historical ZIP file with memory-efficient streaming"""
try:
import zipfile
import io
import csv
import gc
historical_records = []
nearest_ids = {p[0] for p in nearest_points}
with zipfile.ZipFile(io.BytesIO(zip_content)) as zip_file:
csv_files = [f for f in zip_file.namelist() if f.lower().endswith('.csv')]
for csv_filename in csv_files:
try:
# Stream CSV file line-by-line to avoid loading entire file into memory
with zip_file.open(csv_filename) as csv_file:
# Use TextIOWrapper for efficient line-by-line reading
import codecs
text_wrapper = codecs.iterdecode(csv_file, 'utf-8', errors='ignore')
csv_reader = csv.DictReader(text_wrapper, delimiter=';')
# Process in small batches
batch_size = 5000
batch_records = []
row_count = 0
for row in csv_reader:
row_count += 1
measurement_point_id = row.get('id', '').strip()
# Skip rows we don't need
if measurement_point_id not in nearest_ids:
continue
try:
record_data = await self.processor.parse_historical_csv_row(row, nearest_points)
if record_data:
batch_records.append(record_data)
# Store and clear batch when full
if len(batch_records) >= batch_size:
historical_records.extend(batch_records)
batch_records = []
gc.collect()
except Exception:
continue
# Store remaining records
if batch_records:
historical_records.extend(batch_records)
batch_records = []
self.logger.info("CSV file processed",
filename=csv_filename,
rows_scanned=row_count,
records_extracted=len(historical_records))
# Aggressive garbage collection after each CSV
gc.collect()
except Exception as csv_error:
self.logger.warning("Error processing CSV file",
filename=csv_filename,
error=str(csv_error))
continue
self.logger.info("Historical ZIP processing completed",
zip_url=zip_url,
total_records=len(historical_records))
# Final cleanup
del zip_content
gc.collect()
return historical_records
except Exception as e:
self.logger.error("Error processing historical ZIP file",
zip_url=zip_url, error=str(e))
return []
def _next_month(self, current_date: datetime) -> datetime:
"""Helper method to move to next month"""
if current_date.month == 12:
return current_date.replace(year=current_date.year + 1, month=1)
else:
return current_date.replace(month=current_date.month + 1)