diff --git a/services/data/app/external/apis/madrid_traffic_client.py b/services/data/app/external/apis/madrid_traffic_client.py index 2d7c9f60..02002fc5 100644 --- a/services/data/app/external/apis/madrid_traffic_client.py +++ b/services/data/app/external/apis/madrid_traffic_client.py @@ -2,360 +2,32 @@ # services/data/app/external/apis/madrid_traffic_client.py # ================================================================ """ -Madrid-specific traffic client with improved architecture and pedestrian inference +Madrid traffic client - Orchestration layer only +Coordinates between HTTP client, data processor, and business logic components """ -import math -import re -import xml.etree.ElementTree as ET from datetime import datetime, timedelta, timezone -from typing import Dict, List, Any, Optional, Tuple, Set +from typing import Dict, List, Any, Optional, Tuple import structlog -from dataclasses import dataclass -from enum import Enum -import httpx -import zipfile -import csv -import io -import pyproj from .traffic import BaseTrafficClient, SupportedCity from ..base_client import BaseAPIClient +from ..clients.madrid_client import MadridTrafficAPIClient +from ..processors.madrid_processor import MadridTrafficDataProcessor +from ..processors.madrid_business_logic import MadridTrafficAnalyzer +from ..models.madrid_models import TrafficRecord, CongestionLevel from app.core.performance import ( rate_limit, - global_connection_pool, + async_cache, monitor_performance, - global_performance_monitor, - async_cache + global_performance_monitor ) -logger = structlog.get_logger() - -class TrafficServiceLevel(Enum): - """Madrid traffic service levels""" - FLUID = 0 - DENSE = 1 - CONGESTED = 2 - BLOCKED = 3 - - -class CongestionLevel(Enum): - """Standardized congestion levels""" - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - BLOCKED = "blocked" - - -@dataclass -class MeasurementPoint: - """Madrid measurement point data structure""" - id: str - latitude: float - longitude: float - distance: float - name: str - type: str - - -@dataclass -class TrafficRecord: - """Standardized traffic record with pedestrian inference""" - date: datetime - traffic_volume: int - occupation_percentage: int - load_percentage: int - average_speed: int - congestion_level: str - pedestrian_count: int - measurement_point_id: str - measurement_point_name: str - road_type: str - source: str - district: Optional[str] = None - - # Madrid-specific data - intensidad_raw: Optional[int] = None - ocupacion_raw: Optional[int] = None - carga_raw: Optional[int] = None - vmed_raw: Optional[int] = None - - # Pedestrian inference metadata - pedestrian_multiplier: Optional[float] = None - time_pattern_factor: Optional[float] = None - district_factor: Optional[float] = None - - -class MadridPedestrianInference: - """ - Advanced pedestrian inference engine for Madrid traffic data - Uses Madrid-specific patterns and correlations to estimate pedestrian flow - """ - - # Madrid district characteristics for pedestrian patterns - DISTRICT_MULTIPLIERS = { - 'Centro': 2.5, # Historic center, high pedestrian activity - 'Salamanca': 2.0, # Shopping area, high foot traffic - 'Chamberí': 1.8, # Business district - 'Retiro': 2.2, # Near park, high leisure activity - 'Chamartín': 1.6, # Business/residential - 'Tetuán': 1.4, # Mixed residential/commercial - 'Fuencarral': 1.3, # Residential with commercial areas - 'Moncloa': 1.7, # University area - 'Latina': 1.5, # Residential area - 'Carabanchel': 1.2, # Residential periphery - 'Usera': 1.1, # Industrial/residential - 'Villaverde': 1.0, # Industrial area - 'Villa de Vallecas': 1.0, # Peripheral residential - 'Vicálvaro': 0.9, # Peripheral - 'San Blas': 1.1, # Residential - 'Barajas': 0.8, # Airport area, low pedestrian activity - 'Hortaleza': 1.2, # Mixed area - 'Ciudad Lineal': 1.3, # Linear development - 'Puente de Vallecas': 1.2, # Working class area - 'Moratalaz': 1.1, # Residential - 'Arganzuela': 1.6, # Near center, growing area - } - - # Time-based patterns (hour of day) - TIME_PATTERNS = { - 'morning_peak': {'hours': [7, 8, 9], 'multiplier': 2.0}, - 'lunch_peak': {'hours': [12, 13, 14], 'multiplier': 2.5}, - 'evening_peak': {'hours': [18, 19, 20], 'multiplier': 2.2}, - 'afternoon': {'hours': [15, 16, 17], 'multiplier': 1.8}, - 'late_evening': {'hours': [21, 22], 'multiplier': 1.5}, - 'night': {'hours': [23, 0, 1, 2, 3, 4, 5, 6], 'multiplier': 0.3}, - 'morning': {'hours': [10, 11], 'multiplier': 1.4} - } - - # Road type specific patterns - ROAD_TYPE_BASE = { - 'URB': 250, # Urban streets - high pedestrian activity - 'M30': 50, # Ring road - minimal pedestrians - 'C30': 75, # Secondary ring - some pedestrian access - 'A': 25, # Highways - very low pedestrians - 'R': 40 # Radial roads - low to moderate - } - - # Weather impact on pedestrian activity - WEATHER_IMPACT = { - 'rain': 0.6, # 40% reduction in rain - 'hot_weather': 0.8, # 20% reduction when very hot - 'cold_weather': 0.7, # 30% reduction when very cold - 'normal': 1.0 # No impact - } - - @classmethod - def calculate_pedestrian_flow( - cls, - traffic_record: TrafficRecord, - location_context: Optional[Dict[str, Any]] = None - ) -> Tuple[int, Dict[str, float]]: - """ - Calculate pedestrian flow estimate with detailed metadata - - Returns: - Tuple of (pedestrian_count, inference_metadata) - """ - # Base calculation from road type - road_type = traffic_record.road_type or 'URB' - base_pedestrians = cls.ROAD_TYPE_BASE.get(road_type, 200) - - # Time pattern adjustment - hour = traffic_record.date.hour - time_factor = cls._get_time_pattern_factor(hour) - - # District adjustment (if available) - district_factor = 1.0 - district = traffic_record.district or cls._infer_district_from_location(location_context) - if district: - district_factor = cls.DISTRICT_MULTIPLIERS.get(district, 1.0) - - # Traffic correlation adjustment - traffic_factor = cls._calculate_traffic_correlation(traffic_record) - - # Weather adjustment (if data available) - weather_factor = cls._get_weather_factor(traffic_record.date, location_context) - - # Weekend adjustment - weekend_factor = cls._get_weekend_factor(traffic_record.date) - - # Combined calculation - pedestrian_count = int( - base_pedestrians * - time_factor * - district_factor * - traffic_factor * - weather_factor * - weekend_factor - ) - - # Ensure reasonable bounds - pedestrian_count = max(10, min(2000, pedestrian_count)) - - # Metadata for model training - inference_metadata = { - 'base_pedestrians': base_pedestrians, - 'time_factor': time_factor, - 'district_factor': district_factor, - 'traffic_factor': traffic_factor, - 'weather_factor': weather_factor, - 'weekend_factor': weekend_factor, - 'inferred_district': district, - 'hour': hour, - 'road_type': road_type - } - - return pedestrian_count, inference_metadata - - @classmethod - def _get_time_pattern_factor(cls, hour: int) -> float: - """Get time-based pedestrian activity multiplier""" - for pattern, config in cls.TIME_PATTERNS.items(): - if hour in config['hours']: - return config['multiplier'] - return 1.0 # Default multiplier - - @classmethod - def _calculate_traffic_correlation(cls, traffic_record: TrafficRecord) -> float: - """ - Calculate pedestrian correlation with traffic patterns - Higher traffic in urban areas often correlates with more pedestrians - """ - if traffic_record.road_type == 'URB': - # Urban areas: moderate traffic indicates commercial activity - if 30 <= traffic_record.load_percentage <= 70: - return 1.3 # Sweet spot for pedestrian activity - elif traffic_record.load_percentage > 70: - return 0.9 # Too congested, pedestrians avoid - else: - return 1.0 # Normal correlation - else: - # Highway/ring roads: more traffic = fewer pedestrians - if traffic_record.load_percentage > 60: - return 0.5 - else: - return 0.8 - - @classmethod - def _get_weather_factor(cls, date: datetime, location_context: Optional[Dict] = None) -> float: - """Estimate weather impact on pedestrian activity""" - # Simplified weather inference based on season and typical Madrid patterns - month = date.month - - # Madrid seasonal patterns - if month in [12, 1, 2]: # Winter - cold weather impact - return cls.WEATHER_IMPACT['cold_weather'] - elif month in [7, 8]: # Summer - hot weather impact - return cls.WEATHER_IMPACT['hot_weather'] - elif month in [10, 11, 3, 4]: # Rainy seasons - moderate impact - return 0.85 - else: # Spring/early summer - optimal weather - return 1.1 - - @classmethod - def _get_weekend_factor(cls, date: datetime) -> float: - """Weekend vs weekday pedestrian patterns""" - weekday = date.weekday() - hour = date.hour - - if weekday >= 5: # Weekend - if 11 <= hour <= 16: # Weekend shopping/leisure hours - return 1.4 - elif 20 <= hour <= 23: # Weekend evening activity - return 1.3 - else: - return 0.9 - else: # Weekday - return 1.0 - - @classmethod - def _infer_district_from_location(cls, location_context: Optional[Dict] = None) -> Optional[str]: - """ - Infer Madrid district from location context or coordinates - Production implementation using real Madrid district boundaries - """ - if not location_context: - return None - - lat = location_context.get('latitude') - lon = location_context.get('longitude') - - if not (lat and lon): - return None - - # Madrid district boundaries (production-ready with actual coordinates) - # Based on official Madrid municipal boundaries - districts = { - # Central districts - 'Centro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.720, 'lon_max': -3.690}, - 'Arganzuela': {'lat_min': 40.385, 'lat_max': 40.410, 'lon_min': -3.720, 'lon_max': -3.680}, - 'Retiro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.690, 'lon_max': -3.660}, - 'Salamanca': {'lat_min': 40.420, 'lat_max': 40.445, 'lon_min': -3.690, 'lon_max': -3.660}, - 'Chamartín': {'lat_min': 40.445, 'lat_max': 40.480, 'lon_min': -3.690, 'lon_max': -3.660}, - 'Tetuán': {'lat_min': 40.445, 'lat_max': 40.470, 'lon_min': -3.720, 'lon_max': -3.690}, - 'Chamberí': {'lat_min': 40.425, 'lat_max': 40.450, 'lon_min': -3.720, 'lon_max': -3.690}, - 'Fuencarral-El Pardo': {'lat_min': 40.470, 'lat_max': 40.540, 'lon_min': -3.750, 'lon_max': -3.650}, - 'Moncloa-Aravaca': {'lat_min': 40.430, 'lat_max': 40.480, 'lon_min': -3.750, 'lon_max': -3.720}, - 'Latina': {'lat_min': 40.380, 'lat_max': 40.420, 'lon_min': -3.750, 'lon_max': -3.720}, - 'Carabanchel': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.750, 'lon_max': -3.720}, - 'Usera': {'lat_min': 40.350, 'lat_max': 40.385, 'lon_min': -3.720, 'lon_max': -3.690}, - 'Puente de Vallecas': {'lat_min': 40.370, 'lat_max': 40.410, 'lon_min': -3.680, 'lon_max': -3.640}, - 'Moratalaz': {'lat_min': 40.400, 'lat_max': 40.430, 'lon_min': -3.650, 'lon_max': -3.620}, - 'Ciudad Lineal': {'lat_min': 40.430, 'lat_max': 40.460, 'lon_min': -3.650, 'lon_max': -3.620}, - 'Hortaleza': {'lat_min': 40.460, 'lat_max': 40.500, 'lon_min': -3.650, 'lon_max': -3.620}, - 'Villaverde': {'lat_min': 40.320, 'lat_max': 40.360, 'lon_min': -3.720, 'lon_max': -3.680}, - 'Villa de Vallecas': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.640, 'lon_max': -3.600}, - 'Vicálvaro': {'lat_min': 40.390, 'lat_max': 40.430, 'lon_min': -3.620, 'lon_max': -3.580}, - 'San Blas-Canillejas': {'lat_min': 40.430, 'lat_max': 40.470, 'lon_min': -3.620, 'lon_max': -3.580}, - 'Barajas': {'lat_min': 40.470, 'lat_max': 40.510, 'lon_min': -3.620, 'lon_max': -3.550}, - } - - # Find the district that contains the coordinates - for district_name, bounds in districts.items(): - if (bounds['lat_min'] <= lat <= bounds['lat_max'] and - bounds['lon_min'] <= lon <= bounds['lon_max']): - return district_name - - # Special handling for boundary areas and overlaps - # Use more precise point-in-polygon logic for edge cases - if cls._is_in_madrid_metropolitan_area(lat, lon): - # If within Madrid metropolitan area but not in specific district - return cls._get_nearest_district(lat, lon, districts) - - return None # Outside Madrid area - - @staticmethod - def _is_in_madrid_metropolitan_area(lat: float, lon: float) -> bool: - """Check if coordinates are within Madrid metropolitan area""" - # Madrid metropolitan area rough bounds - return (40.30 <= lat <= 40.60 and -3.90 <= lon <= -3.50) - - @staticmethod - def _get_nearest_district(lat: float, lon: float, districts: Dict) -> Optional[str]: - """Find nearest district when coordinates fall in boundary areas""" - min_distance = float('inf') - nearest_district = None - - for district_name, bounds in districts.items(): - # Calculate distance to district center - center_lat = (bounds['lat_min'] + bounds['lat_max']) / 2 - center_lon = (bounds['lon_min'] + bounds['lon_max']) / 2 - - # Simple euclidean distance (good enough for nearby points) - distance = ((lat - center_lat) ** 2 + (lon - center_lon) ** 2) ** 0.5 - - if distance < min_distance: - min_distance = distance - nearest_district = district_name - - # Only return nearest district if it's reasonably close (within ~2km) - return nearest_district if min_distance < 0.02 else None - class MadridTrafficClient(BaseTrafficClient, BaseAPIClient): """ - Enhanced Madrid traffic client with improved architecture and pedestrian inference + Enhanced Madrid traffic client - Orchestration layer + Coordinates HTTP, processing, and business logic components """ # Madrid geographic bounds @@ -364,852 +36,254 @@ class MadridTrafficClient(BaseTrafficClient, BaseAPIClient): 'lon_min': -3.89, 'lon_max': -3.51 } - # API endpoints - REAL_TIME_ENDPOINTS = [ - "https://datos.madrid.es/egob/catalogo/202087-0-trafico-intensidad.xml" - ] - - MEASUREMENT_POINTS_URL = "https://datos.madrid.es/egob/catalogo/202468-263-intensidad-trafico.csv" - # Configuration constants - UTM_ZONE = 30 # Madrid UTM Zone MAX_HISTORICAL_DAYS = 1095 # 3 years - MAX_CSV_PROCESSING_ROWS = 5000000 # Reduced to prevent memory issues + MAX_CSV_PROCESSING_ROWS = 5000000 MEASUREMENT_POINTS_LIMIT = 20 def __init__(self): BaseTrafficClient.__init__(self, SupportedCity.MADRID) BaseAPIClient.__init__(self, base_url="https://datos.madrid.es") - # Initialize coordinate converter - self.utm_proj = pyproj.Proj(proj='utm', zone=self.UTM_ZONE, ellps='WGS84', preserve_units=False) + # Initialize components + self.api_client = MadridTrafficAPIClient() + self.processor = MadridTrafficDataProcessor() + self.analyzer = MadridTrafficAnalyzer() - # Initialize pedestrian inference engine - self.pedestrian_inference = MadridPedestrianInference() - - # Conversion logging control - self._conversion_log_count = [] + self.logger = structlog.get_logger() def supports_location(self, latitude: float, longitude: float) -> bool: """Check if location is within Madrid bounds""" return (self.MADRID_BOUNDS['lat_min'] <= latitude <= self.MADRID_BOUNDS['lat_max'] and self.MADRID_BOUNDS['lon_min'] <= longitude <= self.MADRID_BOUNDS['lon_max']) - @rate_limit(calls=30, period=60) # Max 30 calls per minute - @async_cache(ttl=300) # Cache for 5 minutes + @rate_limit(calls=30, period=60) + @async_cache(ttl=300) @monitor_performance(monitor=global_performance_monitor) async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: - """ - Get current traffic data with enhanced pedestrian inference - """ + """Get current traffic data with enhanced pedestrian inference""" try: - self.logger.info("Fetching Madrid current traffic data", lat=latitude, lon=longitude) - - # Validate location if not self.supports_location(latitude, longitude): self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude) return None - # Try real-time endpoints - for endpoint in self.REAL_TIME_ENDPOINTS: - try: - traffic_data = await self._fetch_traffic_xml_data(endpoint) - - if traffic_data: - self.logger.info("Successfully fetched traffic data", - endpoint=endpoint, points=len(traffic_data)) - - # Find nearest measurement point - nearest_point = self._find_nearest_traffic_point(latitude, longitude, traffic_data) - - if nearest_point: - # Parse and enhance with pedestrian data - parsed_data = await self._parse_traffic_measurement_enhanced( - nearest_point, latitude, longitude - ) - - self.logger.info("Successfully parsed traffic data with pedestrian inference", - point_name=nearest_point.get('descripcion'), - pedestrian_count=parsed_data.get('pedestrian_count', 0)) - return parsed_data - else: - closest_distance = self._get_closest_distance(latitude, longitude, traffic_data) - self.logger.debug("No nearby traffic points found", - lat=latitude, lon=longitude, - closest_distance=closest_distance) - - except Exception as e: - self.logger.debug("Failed to fetch from endpoint", endpoint=endpoint, error=str(e)) - continue + # Fetch XML data + xml_content = await self.api_client.fetch_current_traffic_xml() + if not xml_content: + self.logger.warning("No XML content received") + return None - # No external data available - return empty result - self.logger.warning("No nearby Madrid traffic points found - 0 traffic records obtained") - return None + # Parse XML data + traffic_points = self.processor.parse_traffic_xml(xml_content) + if not traffic_points: + self.logger.warning("No traffic points found in XML") + return None + + # Find nearest traffic point + nearest_point = self.analyzer.find_nearest_traffic_point(traffic_points, latitude, longitude) + if not nearest_point: + self.logger.warning("No nearby traffic points found") + return None + + # Enhance with business logic + enhanced_data = await self._enhance_traffic_data(nearest_point, latitude, longitude) + + self.logger.info("Current traffic data retrieved", + point_id=nearest_point.get('measurement_point_id'), + distance=enhanced_data.get('distance_km', 0)) + + return enhanced_data except Exception as e: - self.logger.error("Failed to get current traffic - 0 traffic records obtained", error=str(e)) + self.logger.error("Error getting current traffic", error=str(e)) return None - @rate_limit(calls=10, period=60) # Max 10 calls per minute for historical data - @async_cache(ttl=3600) # Cache for 1 hour (historical data doesn't change) + @rate_limit(calls=10, period=60) @monitor_performance(monitor=global_performance_monitor) async def get_historical_traffic(self, latitude: float, longitude: float, - start_date: datetime, end_date: datetime, - skip_measurement_points: bool = False) -> List[Dict[str, Any]]: - """ - Get historical traffic data with pedestrian inference - """ + start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Get historical traffic data with pedestrian enhancement""" try: - self.logger.info("Fetching Madrid historical traffic data", - lat=latitude, lon=longitude, start=start_date, end=end_date) - - # Validate location and date range if not self.supports_location(latitude, longitude): - self.logger.warning("Location outside Madrid bounds") + self.logger.warning("Location outside Madrid bounds", lat=latitude, lon=longitude) return [] - if not self._validate_date_range(start_date, end_date): + # Validate date range + if (end_date - start_date).days > self.MAX_HISTORICAL_DAYS: + self.logger.warning("Date range too large, truncating", + requested_days=(end_date - start_date).days, + max_days=self.MAX_HISTORICAL_DAYS) + start_date = end_date - timedelta(days=self.MAX_HISTORICAL_DAYS) + + # Fetch measurement points registry + csv_content = await self.api_client.fetch_measurement_points_csv() + if not csv_content: + self.logger.error("Failed to fetch measurement points registry") return [] - # Try to fetch real historical data - try: - real_data = await self._fetch_real_historical_traffic_enhanced( - latitude, longitude, start_date, end_date) - if real_data: - self.logger.info("Fetched real historical traffic data", records=len(real_data)) - return real_data - else: - self.logger.warning("No historical traffic data available from external API - 0 traffic records obtained") - return [] - except Exception as e: - self.logger.error("Failed to fetch real historical data - 0 traffic records obtained", error=str(e)) + # Parse measurement points + measurement_points = self.processor.parse_measurement_points_csv(csv_content) + if not measurement_points: + self.logger.error("No measurement points found") return [] - except Exception as e: - self.logger.error("Error getting historical traffic data - 0 traffic records obtained", error=str(e)) - return [] - - async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: - """ - Get traffic incidents and events from Madrid's traffic system - Note: Madrid OpenData primarily provides intensity data, not incidents - """ - try: - self.logger.info("Getting traffic events", lat=latitude, lon=longitude, radius=radius_km) - - # Madrid's open data doesn't provide real-time incident data through XML - # This would typically come from a different endpoint or service - # For now, return empty but could be extended to integrate with: - # - Traffic authorities' incident reporting systems - # - Social media feeds - # - Third-party traffic services - - events = [] - - # Check for high congestion areas which could indicate incidents - traffic_data = await self._fetch_traffic_xml_data(self.REAL_TIME_ENDPOINTS[0]) - - if traffic_data: - # Find high congestion points near the query location - nearby_points = [ - point for point in traffic_data - if self._calculate_distance( - latitude, longitude, - point.get('latitude', 0), point.get('longitude', 0) - ) <= radius_km - ] - - # Generate synthetic events based on severe congestion - for point in nearby_points: - service_level = point.get('nivelServicio', 0) - if service_level >= TrafficServiceLevel.BLOCKED.value: - events.append({ - 'type': 'high_congestion', - 'severity': 'high', - 'location': { - 'latitude': point.get('latitude'), - 'longitude': point.get('longitude') - }, - 'description': f"Heavy traffic congestion at {point.get('measurement_point_name', 'Unknown location')}", - 'timestamp': datetime.now(timezone.utc).isoformat(), - 'source': 'madrid_traffic_analysis', - 'measurement_point_id': point.get('measurement_point_id') - }) - - self.logger.info("Retrieved traffic events", count=len(events)) - return events - - except Exception as e: - self.logger.error("Failed to get traffic events", error=str(e)) - return [] - - # Enhanced traffic data processing methods - - async def _parse_traffic_measurement_enhanced( - self, - traffic_point: Dict[str, Any], - query_lat: float, - query_lon: float - ) -> Dict[str, Any]: - """Parse Madrid traffic measurement with enhanced pedestrian inference""" - try: - service_level = traffic_point.get('nivelServicio', 0) - - # Service level to congestion mapping - congestion_mapping = { - TrafficServiceLevel.FLUID.value: CongestionLevel.LOW.value, - TrafficServiceLevel.DENSE.value: CongestionLevel.MEDIUM.value, - TrafficServiceLevel.CONGESTED.value: CongestionLevel.HIGH.value, - TrafficServiceLevel.BLOCKED.value: CongestionLevel.BLOCKED.value - } - - # Speed estimation based on service level - speed_mapping = { - TrafficServiceLevel.FLUID.value: 45, - TrafficServiceLevel.DENSE.value: 25, - TrafficServiceLevel.CONGESTED.value: 15, - TrafficServiceLevel.BLOCKED.value: 5 - } - - congestion_level = congestion_mapping.get(service_level, CongestionLevel.MEDIUM.value) - average_speed = speed_mapping.get(service_level, 25) - - # Create traffic record for pedestrian inference - current_time = datetime.now(timezone.utc) - traffic_record = TrafficRecord( - date=current_time, - traffic_volume=traffic_point.get('intensidad', 0), - occupation_percentage=traffic_point.get('ocupacion', 0), - load_percentage=traffic_point.get('carga', 0), - average_speed=average_speed, - congestion_level=congestion_level, - pedestrian_count=0, # Will be calculated - measurement_point_id=traffic_point.get('idelem', 'unknown'), - measurement_point_name=traffic_point.get('descripcion', 'Unknown location'), - road_type=self._infer_road_type(traffic_point), - source="madrid_opendata_realtime", - intensidad_raw=traffic_point.get('intensidad'), - ocupacion_raw=traffic_point.get('ocupacion'), - carga_raw=traffic_point.get('carga') + # Find nearest measurement points + nearest_points = self.analyzer.find_nearest_measurement_points( + measurement_points, latitude, longitude, num_points=3 ) - # Enhanced pedestrian inference - location_context = { - 'latitude': traffic_point.get('latitude', query_lat), - 'longitude': traffic_point.get('longitude', query_lon), - 'measurement_point': traffic_point - } + if not nearest_points: + self.logger.warning("No nearby measurement points found") + return [] - pedestrian_count, inference_metadata = self.pedestrian_inference.calculate_pedestrian_flow( - traffic_record, location_context + # Process historical data + historical_records = await self._fetch_historical_data_enhanced( + latitude, longitude, start_date, end_date, nearest_points ) - # Update traffic record - traffic_record.pedestrian_count = pedestrian_count - traffic_record.pedestrian_multiplier = inference_metadata.get('time_factor', 1.0) - traffic_record.time_pattern_factor = inference_metadata.get('time_factor', 1.0) - traffic_record.district_factor = inference_metadata.get('district_factor', 1.0) - traffic_record.district = inference_metadata.get('inferred_district') + self.logger.info("Historical traffic data retrieved", + records_count=len(historical_records), + date_range=f"{start_date.date()} to {end_date.date()}") - result = { - "date": current_time, - "traffic_volume": traffic_record.traffic_volume, - "pedestrian_count": pedestrian_count, - "congestion_level": congestion_level, - "average_speed": average_speed, - "occupation_percentage": traffic_record.occupation_percentage, - "load_percentage": traffic_record.load_percentage, - "measurement_point_id": traffic_record.measurement_point_id, - "measurement_point_name": traffic_record.measurement_point_name, - "road_type": traffic_record.road_type, - "source": traffic_record.source, - "district": traffic_record.district, - # Pedestrian inference metadata for model training - "pedestrian_inference": inference_metadata, - # Location data - "measurement_point_latitude": traffic_point.get('latitude'), - "measurement_point_longitude": traffic_point.get('longitude') - } - - return result + return historical_records except Exception as e: - self.logger.error("Error parsing enhanced traffic measurement", error=str(e)) - return self._get_default_traffic_data_enhanced(query_lat, query_lon) + self.logger.error("Error getting historical traffic", error=str(e)) + return [] - def _infer_road_type(self, traffic_point: Dict[str, Any]) -> str: - """Infer road type from traffic point data""" - point_id = str(traffic_point.get('idelem', '')) - description = traffic_point.get('descripcion', '').upper() - - # Road type inference from point ID or description - if 'M-30' in description or 'M30' in description: - return 'M30' - elif 'A-' in description or any(hw in description for hw in ['AUTOPISTA', 'AUTOVIA']): - return 'A' - elif 'R-' in description or 'RADIAL' in description: - return 'R' - elif any(term in description for term in ['CALLE', 'AVENIDA', 'PLAZA', 'PASEO']): - return 'URB' - else: - return 'URB' # Default to urban + async def get_events(self, latitude: float, longitude: float, + radius_km: float = 5.0) -> List[Dict[str, Any]]: + """Get traffic events (incidents, construction, etc.)""" + # Madrid doesn't provide separate events endpoint + # Return enhanced current traffic data as events + current_data = await self.get_current_traffic(latitude, longitude) + if current_data and current_data.get('congestion_level') in ['high', 'blocked']: + return [{ + 'type': 'congestion', + 'severity': current_data.get('congestion_level'), + 'description': f"High traffic congestion at {current_data.get('measurement_point_name', 'measurement point')}", + 'location': { + 'latitude': current_data.get('latitude'), + 'longitude': current_data.get('longitude') + }, + 'timestamp': current_data.get('timestamp') + }] + return [] - # Helper methods for traffic data validation and date range checking - - def _get_default_traffic_data_enhanced(self, latitude: float, longitude: float) -> Dict[str, Any]: - """Get enhanced default traffic data with pedestrian inference""" - current_time = datetime.now(timezone.utc) - - # Create default traffic record - traffic_record = TrafficRecord( - date=current_time, - traffic_volume=100, - occupation_percentage=30, - load_percentage=40, - average_speed=25, - congestion_level=CongestionLevel.MEDIUM.value, - pedestrian_count=0, - measurement_point_id="default", - measurement_point_name="Default Madrid location", - road_type="URB", - source="default_enhanced", - district="Centro" + async def _enhance_traffic_data(self, traffic_point: Dict[str, Any], + query_lat: float, query_lon: float) -> Dict[str, Any]: + """Enhance traffic data with business logic and pedestrian inference""" + # Calculate distance + distance_km = self.analyzer.calculate_distance( + query_lat, query_lon, + traffic_point.get('latitude', 0), + traffic_point.get('longitude', 0) ) - # Calculate pedestrian flow - location_context = {'latitude': latitude, 'longitude': longitude} - pedestrian_count, inference_metadata = self.pedestrian_inference.calculate_pedestrian_flow( + # Classify road type + road_type = self.analyzer.classify_road_type( + traffic_point.get('measurement_point_name', '') + ) + + # Get congestion level + congestion_level = self.analyzer.get_congestion_level( + traffic_point.get('ocupacion', 0) + ) + + # Create traffic record for pedestrian inference + traffic_record = TrafficRecord( + date=datetime.now(timezone.utc), + traffic_volume=traffic_point.get('intensidad', 0), + occupation_percentage=int(traffic_point.get('ocupacion', 0)), + load_percentage=traffic_point.get('carga', 0), + average_speed=30, # Default speed + congestion_level=congestion_level, + pedestrian_count=0, # Will be calculated + measurement_point_id=traffic_point.get('measurement_point_id', ''), + measurement_point_name=traffic_point.get('measurement_point_name', ''), + road_type=road_type, + source='madrid_current_xml' + ) + + # Calculate pedestrian count + location_context = { + 'latitude': traffic_point.get('latitude'), + 'longitude': traffic_point.get('longitude'), + 'measurement_point_name': traffic_point.get('measurement_point_name') + } + + pedestrian_count, inference_metadata = self.analyzer.calculate_pedestrian_flow( traffic_record, location_context ) - return { - "date": current_time, - "traffic_volume": 100, - "pedestrian_count": pedestrian_count, - "congestion_level": CongestionLevel.MEDIUM.value, - "average_speed": 25, - "occupation_percentage": 30, - "load_percentage": 40, - "measurement_point_id": "default", - "measurement_point_name": "Default Madrid location", - "road_type": "URB", - "source": "default_enhanced", - "district": "Centro", - "pedestrian_inference": inference_metadata - } - - # Utility methods (keeping essential ones from original implementation) - - def _validate_date_range(self, start_date: datetime, end_date: datetime) -> bool: - """Validate date range for historical data requests""" - days_diff = (end_date - start_date).days - - if days_diff < 0: - self.logger.warning("End date before start date", start=start_date, end=end_date) - return False - - if days_diff > self.MAX_HISTORICAL_DAYS: - self.logger.warning("Date range too large", days=days_diff) - return False - - return True - - def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: - """Calculate distance between two coordinates using Haversine formula""" - R = 6371 # Earth's radius in km - - dlat = math.radians(lat2 - lat1) - dlon = math.radians(lon2 - lon1) - - a = (math.sin(dlat/2) * math.sin(dlat/2) + - math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * - math.sin(dlon/2) * math.sin(dlon/2)) - - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - return R * c - - def _parse_madrid_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]: - """Parse Madrid traffic XML with correct structure - improved from madrid_opendata.py""" - traffic_points = [] - - try: - cleaned_xml = self._clean_madrid_xml(xml_content) - root = ET.fromstring(cleaned_xml) - - self.logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root))) - - if root.tag == 'pms': - pm_elements = root.findall('pm') - self.logger.debug("Found PM elements", count=len(pm_elements)) - - for pm in pm_elements: - try: - traffic_point = self._extract_madrid_pm_element(pm) - - if self._is_valid_traffic_point(traffic_point): - traffic_points.append(traffic_point) - - # Log first few points for debugging - if len(traffic_points) <= 3: - self.logger.debug("Sample traffic point", - id=traffic_point['idelem'], - lat=traffic_point['latitude'], - lon=traffic_point['longitude'], - intensity=traffic_point.get('intensidad')) - - except Exception as e: - self.logger.debug("Error parsing PM element", error=str(e)) - continue - else: - self.logger.warning("Unexpected XML root tag", root_tag=root.tag) - - self.logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points)) - return traffic_points - - except ET.ParseError as e: - self.logger.warning("Failed to parse Madrid XML", error=str(e)) - return self._extract_traffic_data_regex(xml_content) - except Exception as e: - self.logger.error("Error in Madrid traffic XML parsing", error=str(e)) - return [] - - def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]: - """Extract traffic data from Madrid element with coordinate conversion - improved from madrid_opendata.py""" - try: - point_data = {} - utm_x = utm_y = None - - # Extract all child elements - for child in pm_element: - tag, text = child.tag, child.text.strip() if child.text else '' - - if tag == 'idelem': - point_data['idelem'] = text - elif tag == 'descripcion': - point_data['descripcion'] = text - elif tag == 'intensidad': - point_data['intensidad'] = self._safe_int(text) - elif tag == 'ocupacion': - point_data['ocupacion'] = self._safe_float(text) - elif tag == 'carga': - point_data['carga'] = self._safe_int(text) - elif tag == 'nivelServicio': - point_data['nivelServicio'] = self._safe_int(text) - elif tag == 'st_x': # Correct tag name for UTM X coordinate - utm_x = text - point_data['utm_x'] = text - elif tag == 'st_y': # Correct tag name for UTM Y coordinate - utm_y = text - point_data['utm_y'] = text - elif tag == 'error': - point_data['error'] = text - elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']: - point_data[tag] = text - - # Convert coordinates - if utm_x and utm_y: - latitude, longitude = self._convert_utm_to_latlon(utm_x, utm_y) - - if latitude and longitude and self._validate_madrid_coordinates(latitude, longitude): - point_data.update({ - 'latitude': latitude, - 'longitude': longitude, - 'measurement_point_id': point_data.get('idelem'), - 'measurement_point_name': point_data.get('descripcion'), - 'timestamp': datetime.now(timezone.utc), - 'source': 'madrid_opendata_xml' - }) - - # Log successful conversions (limited) - self._log_coordinate_conversion(point_data, utm_x, utm_y, latitude, longitude) - return point_data - else: - self.logger.debug("Invalid coordinates after conversion", - idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y) - return {} - else: - self.logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem')) - return {} - - except Exception as e: - self.logger.debug("Error extracting Madrid PM element", error=str(e)) - return {} - - def _convert_utm_to_latlon(self, utm_x_str: str, utm_y_str: str) -> Tuple[Optional[float], Optional[float]]: - """Convert UTM coordinates to lat/lon using pyproj - improved from madrid_opendata.py""" - try: - utm_x = float(utm_x_str.replace(',', '.')) - utm_y = float(utm_y_str.replace(',', '.')) - - longitude, latitude = self.utm_proj(utm_x, utm_y, inverse=True) - return round(latitude, 6), round(longitude, 6) - except (ValueError, TypeError, Exception): - return None, None - - def _validate_madrid_coordinates(self, latitude: float, longitude: float) -> bool: - """Validate coordinates are in Madrid area""" - return (self.MADRID_BOUNDS['lat_min'] <= latitude <= self.MADRID_BOUNDS['lat_max'] and - self.MADRID_BOUNDS['lon_min'] <= longitude <= self.MADRID_BOUNDS['lon_max']) - - def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool: - """Check if traffic point has valid essential data""" - return (traffic_point.get('latitude') and - traffic_point.get('longitude') and - traffic_point.get('idelem')) - - def _log_coordinate_conversion(self, point_data: Dict, utm_x: str, utm_y: str, - latitude: float, longitude: float) -> None: - """Log coordinate conversion (limited to first few for debugging)""" - if len(self._conversion_log_count) < 3: - self._conversion_log_count.append(1) - self.logger.debug("Successful UTM conversion", - idelem=point_data.get('idelem'), - utm_x=utm_x, utm_y=utm_y, - latitude=latitude, longitude=longitude, - descripcion=point_data.get('descripcion')) - - def _clean_madrid_xml(self, xml_content: str) -> str: - """Clean Madrid XML to handle undefined entities and encoding issues - from madrid_opendata.py""" - try: - import re - # Remove BOM if present - xml_content = xml_content.lstrip('\ufeff') - - # Replace undefined entities - entity_replacements = { - ' ': ' ', '©': '©', '®': '®', '™': '™' - } - - for entity, replacement in entity_replacements.items(): - xml_content = xml_content.replace(entity, replacement) - - # Fix unescaped ampersands - xml_content = re.sub(r'&(?![a-zA-Z0-9#]{1,10};)', '&', xml_content) - - # Remove invalid control characters - xml_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', xml_content) - - # Handle Spanish characters (convert to safe equivalents) - spanish_chars = { - 'ñ': 'n', 'Ñ': 'N', 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', - 'Á': 'A', 'É': 'E', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'ü': 'u', 'Ü': 'U' - } - - for spanish_char, replacement in spanish_chars.items(): - xml_content = xml_content.replace(spanish_char, replacement) - - return xml_content - - except Exception as e: - self.logger.warning("Error cleaning Madrid XML", error=str(e)) - return xml_content - - def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]: - """Extract traffic data using regex when XML parsing fails - from madrid_opendata.py""" - import re - traffic_points = [] - - try: - pm_pattern = r'(.*?)' - pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL) - - for pm_content in pm_matches: - try: - extracted_data = self._extract_pm_data_regex(pm_content) - if extracted_data and self._is_valid_traffic_point(extracted_data): - traffic_points.append(extracted_data) - - except Exception as e: - self.logger.debug("Error parsing regex PM match", error=str(e)) - continue - - self.logger.debug("Regex extraction results", count=len(traffic_points)) - return traffic_points - - except Exception as e: - self.logger.error("Error in regex extraction", error=str(e)) - return [] - - def _extract_pm_data_regex(self, pm_content: str) -> Dict[str, Any]: - """Extract individual PM data using regex - from madrid_opendata.py""" - import re - patterns = { - 'idelem': r'(.*?)', - 'intensidad': r'(.*?)', - 'st_x': r'(.*?)', - 'st_y': r'(.*?)', - 'descripcion': r'(.*?)' + # Build enhanced response + enhanced_data = { + 'timestamp': datetime.now(timezone.utc), + 'latitude': traffic_point.get('latitude'), + 'longitude': traffic_point.get('longitude'), + 'measurement_point_id': traffic_point.get('measurement_point_id'), + 'measurement_point_name': traffic_point.get('measurement_point_name'), + 'traffic_volume': traffic_point.get('intensidad', 0), + 'occupation_percentage': int(traffic_point.get('ocupacion', 0)), + 'load_percentage': traffic_point.get('carga', 0), + 'congestion_level': congestion_level, + 'pedestrian_count': pedestrian_count, + 'road_type': road_type, + 'distance_km': distance_km, + 'source': 'madrid_current_xml', + 'city': 'madrid', + 'inference_metadata': inference_metadata, + 'raw_data': traffic_point } - extracted = {} - for field, pattern in patterns.items(): - match = re.search(pattern, pm_content) - extracted[field] = match.group(1) if match else '' - - if extracted['idelem'] and extracted['st_x'] and extracted['st_y']: - # Convert coordinates - latitude, longitude = self._convert_utm_to_latlon(extracted['st_x'], extracted['st_y']) - - if latitude and longitude: - return { - 'idelem': extracted['idelem'], - 'descripcion': extracted['descripcion'] or f"Point {extracted['idelem']}", - 'intensidad': self._safe_int(extracted['intensidad']), - 'latitude': latitude, - 'longitude': longitude, - 'ocupacion': 0, - 'carga': 0, - 'nivelServicio': 0, - 'error': 'N', - 'measurement_point_id': extracted['idelem'], - 'measurement_point_name': extracted['descripcion'] or f"Point {extracted['idelem']}", - 'timestamp': datetime.now(timezone.utc), - 'source': 'madrid_opendata_xml_regex' - } - - return {} + return enhanced_data - def _decode_response_content(self, response) -> Optional[str]: - """Decode response content with multiple encoding attempts - from madrid_opendata.py""" + async def _fetch_historical_data_enhanced(self, latitude: float, longitude: float, + start_date: datetime, end_date: datetime, + nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]: + """Fetch and process historical traffic data""" + historical_records = [] + try: - return response.text - except UnicodeDecodeError: - # Try manual encoding for Spanish content - for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: - try: - content = response.content.decode(encoding) - if content and len(content) > 100: - self.logger.debug("Successfully decoded with encoding", encoding=encoding) - return content - except UnicodeDecodeError: + # Process by year and month to avoid memory issues + current_date = start_date.replace(day=1) # Start from beginning of month + + while current_date <= end_date: + year = current_date.year + month = current_date.month + + # Build historical URL + zip_url = self.api_client._build_historical_url(year, month) + + self.logger.info("Processing historical ZIP file", + year=year, month=month, zip_url=zip_url) + + # Fetch ZIP content + zip_content = await self.api_client.fetch_historical_zip(zip_url) + if not zip_content: + self.logger.warning("Failed to fetch historical ZIP", url=zip_url) + current_date = current_date.replace(month=current_date.month + 1) if current_date.month < 12 else current_date.replace(year=current_date.year + 1, month=1) continue - return None - - def _safe_float(self, value_str: str) -> float: - """Safely convert string to float""" - try: - return float(value_str.replace(',', '.')) - except (ValueError, TypeError): - return 0.0 - - async def _fetch_measurement_points_registry(self) -> Dict[str, Dict[str, Any]]: - """ - Fetch Madrid measurement points registry with coordinates - Returns dict mapping point_id to {latitude, longitude, name, ...} - """ - try: - async with httpx.AsyncClient( - timeout=30.0, - headers={ - 'User-Agent': 'MadridTrafficClient/2.0', - 'Accept': 'text/csv,application/csv,*/*' - }, - follow_redirects=True - ) as client: - self.logger.debug("Fetching measurement points registry", url=self.MEASUREMENT_POINTS_URL) - response = await client.get(self.MEASUREMENT_POINTS_URL) + # Process ZIP content with enhanced parsing + month_records = await self._process_historical_zip_enhanced( + zip_content, zip_url, latitude, longitude, nearest_points + ) - if response.status_code == 200: - csv_content = response.text - return await self._parse_measurement_points_csv(csv_content) - else: - self.logger.warning("Failed to fetch measurement points", - status=response.status_code, url=self.MEASUREMENT_POINTS_URL) - return {} - - except Exception as e: - self.logger.error("Error fetching measurement points registry", - url=self.MEASUREMENT_POINTS_URL, error=str(e)) - return {} - - async def _parse_measurement_points_csv(self, csv_content: str) -> Dict[str, Dict[str, Any]]: - """Parse measurement points CSV into lookup dictionary - MEMORY OPTIMIZED""" - measurement_points = {} - - try: - import csv - import io - - # Parse CSV with semicolon delimiter - csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=';') - - processed_count = 0 - for row in csv_reader: - try: - - # Extract point ID and coordinates - point_id = row.get('id', '').strip() - if not point_id: - continue - - processed_count += 1 - - # Try different coordinate field names - lat_str = '' - lon_str = '' - - # Common coordinate field patterns - lat_fields = ['lat', 'latitude', 'latitud', 'y', 'utm_y'] - lon_fields = ['lon', 'lng', 'longitude', 'longitud', 'x', 'utm_x'] - - for field in lat_fields: - if field in row and row[field].strip(): - lat_str = row[field].strip() - break - - for field in lon_fields: - if field in row and row[field].strip(): - lon_str = row[field].strip() - break - - if lat_str and lon_str: - try: - # Try parsing as decimal degrees first - lat = float(lat_str) - lon = float(lon_str) - - # If coordinates look like UTM (large values), convert them - if abs(lat) > 180 or abs(lon) > 180: - # Convert from UTM Zone 30N to WGS84 - utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', preserve_units=False) - wgs84_proj = pyproj.Proj(proj='latlong', datum='WGS84') - transformer = pyproj.Transformer.from_proj(utm_proj, wgs84_proj, always_xy=True) - lon, lat = transformer.transform(lon, lat) - - measurement_points[point_id] = { - 'latitude': lat, - 'longitude': lon, - 'name': row.get('name', row.get('descripcion', f'Point {point_id}')), - 'district': row.get('district', row.get('distrito', '')), - 'road_type': row.get('tipo_elem', row.get('type', '')), - 'raw_data': dict(row) - } - - except (ValueError, Exception): - continue - - except Exception: - continue - - self.logger.info("Parsed measurement points registry", - total_points=len(measurement_points)) - return measurement_points - - except Exception as e: - self.logger.error("Error parsing measurement points CSV", error=str(e)) - return {} - - def _get_next_month(self, current_date: datetime) -> datetime: - """Get next month date""" - if current_date.month == 12: - return current_date.replace(year=current_date.year + 1, month=1) - else: - return current_date.replace(month=current_date.month + 1) - - # Async methods for data fetching (simplified versions) - - async def _fetch_traffic_xml_data(self, endpoint: str) -> Optional[List[Dict[str, Any]]]: - """Fetch and parse Madrid traffic XML data with improved parsing from madrid_opendata.py""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', - 'Accept': 'application/xml,text/xml,*/*', - 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', - 'Accept-Encoding': 'gzip, deflate, br', - 'Cache-Control': 'no-cache', - 'Referer': 'https://datos.madrid.es/' - } - - response = await self.get(endpoint, headers=headers, timeout=30) - - if not response or response.status_code != 200: - self.logger.warning("Failed to fetch XML data", - endpoint=endpoint, - status=response.status_code if response else None) - return None - - # Get XML content with encoding handling - xml_content = self._decode_response_content(response) - if not xml_content: - self.logger.debug("No XML content received", endpoint=endpoint) - return None - - self.logger.debug("Madrid XML content preview", - length=len(xml_content), - first_500=xml_content[:500] if len(xml_content) > 500 else xml_content) - - # Parse with improved method - traffic_points = self._parse_madrid_traffic_xml(xml_content) - - if traffic_points: - self.logger.info("Successfully parsed Madrid traffic XML", points=len(traffic_points)) - return traffic_points - else: - self.logger.warning("No traffic points found in XML", endpoint=endpoint) - return None + # Filter by date range + filtered_records = [ + record for record in month_records + if start_date <= record.get('date', datetime.min.replace(tzinfo=timezone.utc)) <= end_date + ] - except Exception as e: - self.logger.error("Error fetching traffic XML data", - endpoint=endpoint, - error=str(e)) - return None - - async def _fetch_real_historical_traffic_enhanced(self, latitude: float, longitude: float, - start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Fetch real historical traffic data with pedestrian enhancement""" - try: - self.logger.info("Fetching historical traffic data", - lat=latitude, lon=longitude, - start=start_date, end=end_date) - - # Madrid historical data is available through ZIP files - # Each month has a specific URL pattern - historical_data = [] - - current_date = start_date.replace(day=1) # Start of month - months_processed = 0 - max_months_per_request = 24 # Limit to prevent memory exhaustion - - while current_date <= end_date and months_processed < max_months_per_request: - try: - # Calculate the month code for Madrid's ZIP files - # This follows Madrid's naming convention - year = current_date.year - month = current_date.month - - # Madrid uses a specific coding system for historical files - # Calculate month code based on 2025/June = 145 reference point - reference_year, reference_month, reference_code = 2025, 6, 145 - months_diff = (year - reference_year) * 12 + (month - reference_month) - month_code = reference_code + months_diff - - # Validate month code is within reasonable range - if not (100 <= month_code <= 300): - self.logger.warning("Month code out of expected range", - year=year, month=month, code=month_code) - current_date = self._get_next_month(current_date) - continue - - # Use the correct Madrid URL pattern: 208627-{month_code} - zip_url = f"https://datos.madrid.es/egob/catalogo/208627-{month_code}-transporte-ptomedida-historico.zip" - - # Fetch and process the ZIP file - month_data = await self._process_historical_zip_file(zip_url, latitude, longitude) - - if month_data: - historical_data.extend(month_data) - self.logger.debug("Processed historical data for month", - year=year, month=month, records=len(month_data)) - - months_processed += 1 - - except Exception as month_error: - self.logger.warning("Failed to process month", - year=current_date.year, - month=current_date.month, - error=str(month_error)) + historical_records.extend(filtered_records) + + self.logger.info("Month processing completed", + year=year, month=month, + month_records=len(month_records), + filtered_records=len(filtered_records), + total_records=len(historical_records)) # Move to next month if current_date.month == 12: @@ -1217,473 +291,59 @@ class MadridTrafficClient(BaseTrafficClient, BaseAPIClient): else: current_date = current_date.replace(month=current_date.month + 1) - # Filter data to exact date range - filtered_data = [ - record for record in historical_data - if start_date <= record.get('date', datetime.min.replace(tzinfo=timezone.utc)) <= end_date - ] - - self.logger.info("Historical traffic data fetched", - total_records=len(filtered_data), - months_processed=(end_date.year - start_date.year) * 12 + end_date.month - start_date.month + 1) - - return filtered_data + return historical_records except Exception as e: - self.logger.error("Error fetching historical traffic data", error=str(e)) - return [] + self.logger.error("Error fetching historical data", error=str(e)) + return historical_records # Return partial results - async def _process_historical_zip_file(self, zip_url: str, latitude: float, longitude: float) -> List[Dict[str, Any]]: - """Process a single historical ZIP file containing Madrid traffic data""" - import zipfile - import io - + async def _process_historical_zip_enhanced(self, zip_content: bytes, zip_url: str, + latitude: float, longitude: float, + nearest_points: List[Tuple[str, Dict[str, Any], float]]) -> List[Dict[str, Any]]: + """Process historical ZIP file with enhanced parsing""" try: - self.logger.info("Processing historical ZIP file", zip_url=zip_url) + import zipfile + import io + import csv + import gc - # Download the ZIP file - headers = { - 'User-Agent': 'Bakery-IA Historical Traffic Processor/2.0', - 'Accept': 'application/zip, application/octet-stream', - 'Accept-Encoding': 'gzip, deflate', - 'Connection': 'keep-alive', - 'Referer': 'https://datos.madrid.es/' - } - - response = await self.get(zip_url, headers=headers, timeout=120) # Longer timeout for large files - - if not response or response.status_code != 200: - self.logger.warning("Failed to download ZIP file", - zip_url=zip_url, - status=response.status_code if response else None) - return [] - - # Process ZIP content in memory historical_records = [] + nearest_ids = {p[0] for p in nearest_points} - # Conditionally fetch measurement points registry - measurement_points = {} - - # Fetch measurement points registry for coordinate lookup (limited for memory efficiency) - measurement_points = await self._fetch_measurement_points_registry() - self.logger.info("Fetched measurement points registry", - total_points=len(measurement_points) if measurement_points else 0) - - - # Find nearest 3 (instead of filtering by radius) - nearest_points = self._find_nearest_measurement_points(measurement_points, latitude, longitude, num_points=3) - nearest_ids = {p[0] for p in nearest_points} # Set for fast lookup - - if not nearest_points: - self.logger.warning("No nearby measurement points found") - return [] - - with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file: - # List all files in the ZIP - file_list = zip_file.namelist() - - # Process CSV files containing traffic data - csv_files = [f for f in file_list if f.lower().endswith('.csv')] + with zipfile.ZipFile(io.BytesIO(zip_content)) as zip_file: + csv_files = [f for f in zip_file.namelist() if f.lower().endswith('.csv')] for csv_filename in csv_files: try: - # Read CSV content with zip_file.open(csv_filename) as csv_file: - # Decode content (Madrid files are typically in UTF-8 or ISO-8859-1) - content = csv_file.read() - - # Try different encodings - try: - text_content = content.decode('utf-8') - except UnicodeDecodeError: - try: - text_content = content.decode('iso-8859-1') - except UnicodeDecodeError: - text_content = content.decode('utf-8', errors='ignore') - - # Parse CSV with chunked processing to save memory - csv_records = await self._process_csv_content_chunked( - text_content, csv_filename, latitude, longitude, nearest_ids, nearest_points - ) - historical_records.extend(csv_records) - - # Clean up text_content immediately to free memory - del text_content - import gc - gc.collect() - + text_content = csv_file.read().decode('utf-8', errors='ignore') + + # Process CSV in chunks using processor + csv_records = await self.processor.process_csv_content_chunked( + text_content, csv_filename, nearest_ids, nearest_points + ) + + historical_records.extend(csv_records) + + # Force garbage collection + gc.collect() + except Exception as csv_error: self.logger.warning("Error processing CSV file", filename=csv_filename, error=str(csv_error)) continue - # Skip sorting to save memory - database can sort if needed - # historical_records.sort(key=lambda x: x.get('date', datetime.min.replace(tzinfo=timezone.utc))) - self.logger.info("Historical ZIP processing completed", zip_url=zip_url, total_records=len(historical_records)) return historical_records - except zipfile.BadZipFile: - self.logger.error("Invalid ZIP file", zip_url=zip_url) - return [] except Exception as e: self.logger.error("Error processing historical ZIP file", zip_url=zip_url, error=str(e)) return [] - async def _process_csv_content_chunked( - self, - text_content: str, - csv_filename: str, - latitude: float, - longitude: float, - nearest_ids: Set[str], - nearest_points: List[Tuple[str, Dict, float]]) -> List[Dict[str, Any]]: - """Process CSV content in chunks to prevent memory issues""" - import csv - import io - import gc - - try: - # Process CSV with chunked streaming - csv_reader = csv.DictReader(io.StringIO(text_content), delimiter=';') - - chunk_size = 10000 # Process 10k rows at a time to reduce memory pressure - chunk_records = [] - all_records = [] - row_count = 0 - processed_count = 0 - - # Debug: Log first few CSV IDs and nearest IDs - total_rows_seen = 0 - debug_logged = False - - # Debug: Check text_content size - self.logger.debug("CSV content info", - filename=csv_filename, - content_size=len(text_content), - first_100_chars=text_content[:100]) - - for row in csv_reader: - total_rows_seen += 1 - measurement_point_id = row.get('id', '').strip() - - # Debug logging for first few records - if not debug_logged and total_rows_seen <= 5: - self.logger.debug("CSV vs Nearest ID comparison", - row_num=total_rows_seen, - csv_id=measurement_point_id, - nearest_ids=list(nearest_ids)[:5], - total_nearest=len(nearest_ids)) - if total_rows_seen == 5: - debug_logged = True - - if measurement_point_id not in nearest_ids: # Early skip! - continue - - row_count += 1 - - # Hard limit to prevent memory issues - if row_count > self.MAX_CSV_PROCESSING_ROWS: - self.logger.warning("Row limit reached for CSV", - filename=csv_filename, - city="madrid") - break - - try: - # Extract and validate data - record_data = await self._parse_historical_csv_row(row, latitude, longitude, nearest_points) - - if record_data: - chunk_records.append(record_data) - processed_count += 1 - - # Process chunk when it reaches size limit - if len(chunk_records) >= chunk_size: - all_records.extend(chunk_records) - - # Clear chunk and force garbage collection - chunk_records = [] - gc.collect() - elif processed_count < 5: # Debug first few failures - self.logger.debug("Row parsing returned None", - row_num=total_rows_seen, - measurement_point_id=measurement_point_id) - - except Exception as e: - # Log first few parsing exceptions - if processed_count < 5: - self.logger.error("Row parsing exception", - row_num=total_rows_seen, - measurement_point_id=measurement_point_id, - error=str(e)) - continue - - # Process remaining records - if chunk_records: - all_records.extend(chunk_records) - chunk_records = [] - gc.collect() - - self.logger.info("Processed CSV file", - filename=csv_filename, - total_rows_read=total_rows_seen, - rows_passed_filter=row_count, - processed_records=processed_count) - - return all_records - - except Exception as e: - self.logger.error("Error processing CSV content", - filename=csv_filename, error=str(e)) - return [] - async def _parse_historical_csv_row(self, row: Dict[str, str], query_lat: float, query_lon: float, - nearest_points: List[Tuple[str, Dict, float]]) -> Optional[Dict[str, Any]]: - """Parse a single row from Madrid's historical traffic CSV with actual structure""" - try: - # Actual Madrid CSV structure (2025): - # id, fecha, tipo_elem, intensidad, ocupacion, carga, vmed, error, periodo_integracion - - # Extract date and time - fecha_str = row.get('fecha', '').strip() - if not fecha_str: - self.logger.info("No fecha data") - return None - - # Parse Madrid's date format (YYYY-MM-DD HH:MM:SS) - try: - date_obj = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') - date_obj = date_obj.replace(tzinfo=timezone.utc) - except Exception as e: - self.logger.error("Parse data error", error=str(e)) - return None - - measurement_point_id = row.get('id', '').strip() - - # Lookup point_data from nearest_points - point_match = next((p for p in nearest_points if p[0] == measurement_point_id), None) - if not point_match: - return None - - point_data = point_match[1] - distance_km = point_match[2] - - lat = point_data.get('latitude') - lon = point_data.get('longitude') - measurement_point_name = point_data.get('name', f"Madrid Point {measurement_point_id}") - - # Extract traffic data - intensidad = self._safe_int(row.get('intensidad', '0')) - ocupacion = self._safe_int(row.get('ocupacion', '0')) - carga = self._safe_int(row.get('carga', '0')) - vmed = self._safe_int(row.get('vmed', '0')) # Average speed - error_status = row.get('error', '').strip() - - # Calculate congestion level from ocupacion (occupation percentage) - if ocupacion >= 80: - congestion_level = CongestionLevel.BLOCKED.value - elif ocupacion >= 50: - congestion_level = CongestionLevel.HIGH.value - elif ocupacion >= 25: - congestion_level = CongestionLevel.MEDIUM.value - else: - congestion_level = CongestionLevel.LOW.value - - # Apply pedestrian inference for historical data - location_context = { - 'latitude': lat, - 'longitude': lon, - 'measurement_point_name': measurement_point_name, - 'district': MadridPedestrianInference._infer_district_from_location({'latitude': lat, 'longitude': lon}) - } - - # Create traffic record for pedestrian inference - traffic_record = TrafficRecord( - date=date_obj, - traffic_volume=intensidad, - occupation_percentage=ocupacion, - load_percentage=carga, - average_speed=max(vmed, 5), # Ensure minimum speed - congestion_level=congestion_level, - pedestrian_count=0, # Will be calculated - measurement_point_id=measurement_point_id, - measurement_point_name=measurement_point_name, - road_type=self._classify_road_type(measurement_point_name), - source='madrid_historical_zip' - ) - - # Calculate pedestrian count - pedestrian_count, inference_metadata = self.pedestrian_inference.calculate_pedestrian_flow( - traffic_record, location_context - ) - - # Build result dictionary - result = { - 'date': date_obj, - 'measurement_point_id': measurement_point_id, - 'measurement_point_name': measurement_point_name, - 'latitude': lat, - 'longitude': lon, - 'traffic_volume': intensidad, - 'occupation_percentage': ocupacion, - 'load_percentage': carga, - 'average_speed': max(vmed, 5), - 'congestion_level': congestion_level, - 'pedestrian_count': pedestrian_count, - 'source': 'madrid_historical_zip', - 'city': 'madrid', - 'district': location_context.get('district'), - 'road_type': self._classify_road_type(measurement_point_name), - 'has_pedestrian_inference': True, - 'data_quality_score': self._calculate_data_quality_score(row), - 'distance_from_query_km': distance_km, - 'inference_metadata': inference_metadata, - 'raw_data': { - 'error_status': error_status, - 'periodo_integracion': row.get('periodo_integracion', ''), - 'tipo_elem': row.get('tipo_elem', ''), - 'measurement_point_id': measurement_point_id - }, - 'error_status': error_status if error_status else None - } - - return result - - except Exception as e: - self.logger.error("Error cvs row", error=str(e)) - return None - - def _safe_int(self, value_str: str) -> int: - """Safely convert string to int - improved version""" - try: - return int(float(value_str.replace(',', '.'))) - except (ValueError, TypeError): - return 0 - - def _calculate_data_quality_score(self, row: Dict[str, str]) -> float: - """Calculate data quality score for historical record""" - score = 100.0 - - # Check for missing data - if not row.get('intensidad', '').strip(): - score -= 20 - if not row.get('ocupacion', '').strip(): - score -= 15 - if not row.get('vmed', '').strip(): - score -= 15 - if not row.get('descripcion', '').strip(): - score -= 10 - - # Check for error status - error_status = row.get('error', '').strip() - if error_status and error_status.lower() not in ['n', 'no', '0', '']: - score -= 30 - - return max(0.0, score) - - def _classify_road_type(self, measurement_point_name: str) -> str: - """Classify road type based on measurement point name""" - if not measurement_point_name: - return 'unknown' - - name_lower = measurement_point_name.lower() - - if any(keyword in name_lower for keyword in ['m-30', 'm30', 'circunvalacion']): - return 'ring_road' - elif any(keyword in name_lower for keyword in ['a-', 'autopista', 'autovia']): - return 'highway' - elif any(keyword in name_lower for keyword in ['calle', 'avenida', 'paseo', 'plaza']): - return 'urban' - elif any(keyword in name_lower for keyword in ['acceso', 'enlace', 'intercambiador']): - return 'access_road' - else: - return 'urban' # Default to urban for Madrid - - def _find_nearest_traffic_point(self, latitude: float, longitude: float, - traffic_data: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: - """Find the nearest traffic measurement point""" - try: - if not traffic_data: - return None - - min_distance = float('inf') - nearest_point = None - - for point in traffic_data: - point_lat = point.get('latitude', 0) - point_lon = point.get('longitude', 0) - - if point_lat and point_lon: - distance = self._calculate_distance(latitude, longitude, point_lat, point_lon) - - if distance < min_distance: - min_distance = distance - nearest_point = point - - if nearest_point: - self.logger.debug("Found nearest traffic point", - distance_km=min_distance, - point_id=nearest_point.get('measurement_point_id')) - - return nearest_point - - except Exception as e: - self.logger.error("Error finding nearest traffic point", error=str(e)) - return None - - def _get_closest_distance(self, latitude: float, longitude: float, traffic_data: List[Dict[str, Any]]) -> float: - """Get distance to closest traffic point""" - try: - if not traffic_data: - return float('inf') - - min_distance = float('inf') - - for point in traffic_data: - point_lat = point.get('latitude', 0) - point_lon = point.get('longitude', 0) - - if point_lat and point_lon: - distance = self._calculate_distance(latitude, longitude, point_lat, point_lon) - min_distance = min(min_distance, distance) - - return min_distance - - except Exception as e: - self.logger.error("Error calculating closest distance", error=str(e)) - return float('inf') - - def _find_nearest_measurement_points(self, measurement_points: Dict[str, Dict[str, Any]], - latitude: float, longitude: float, - num_points: int = 3, max_distance_km: Optional[float] = 5.0) -> List[Tuple[str, Dict[str, Any], float]]: - """ - Find the nearest num_points measurement points, sorted by distance. - Returns list of (point_id, point_data, distance_km) tuples. - """ - if not measurement_points: - return [] - - distances = [] - for point_id, point_data in measurement_points.items(): - point_lat = point_data.get('latitude') - point_lon = point_data.get('longitude') - if point_lat is not None and point_lon is not None: - distance = self._calculate_distance(latitude, longitude, point_lat, point_lon) - distances.append((distance, point_id, point_data)) - - # Sort by distance and take top N - distances.sort(key=lambda x: x[0]) - nearest = distances[:num_points] - - # Filter by max_distance if set - if max_distance_km is not None: - nearest = [p for p in nearest if p[0] <= max_distance_km] - - self.logger.info(f"Found {len(nearest)} nearest measurement points (out of {len(measurement_points)} total)") - return [(p[1], p[2], p[0]) for p in nearest] # (id, data, distance) \ No newline at end of file diff --git a/services/data/app/external/clients/__init__.py b/services/data/app/external/clients/__init__.py new file mode 100644 index 00000000..5e26bbbf --- /dev/null +++ b/services/data/app/external/clients/__init__.py @@ -0,0 +1,12 @@ +# ================================================================ +# services/data/app/external/clients/__init__.py +# ================================================================ +""" +HTTP clients package +""" + +from .madrid_client import MadridTrafficAPIClient + +__all__ = [ + 'MadridTrafficAPIClient' +] \ No newline at end of file diff --git a/services/data/app/external/clients/madrid_client.py b/services/data/app/external/clients/madrid_client.py new file mode 100644 index 00000000..2e099065 --- /dev/null +++ b/services/data/app/external/clients/madrid_client.py @@ -0,0 +1,155 @@ +# ================================================================ +# services/data/app/external/clients/madrid_client.py +# ================================================================ +""" +Pure HTTP client for Madrid traffic APIs +Handles only HTTP communication and response decoding +""" + +import httpx +import structlog +from datetime import datetime +from typing import Optional, Dict, Any + +from ..base_client import BaseAPIClient + + +class MadridTrafficAPIClient(BaseAPIClient): + """Pure HTTP client for Madrid traffic APIs""" + + TRAFFIC_ENDPOINT = "https://datos.madrid.es/egob/catalogo/202468-10-intensidad-trafico.xml" + MEASUREMENT_POINTS_URL = "https://datos.madrid.es/egob/catalogo/202468-263-intensidad-trafico.csv" + + def __init__(self): + super().__init__(base_url="https://datos.madrid.es") + self.logger = structlog.get_logger() + + def _decode_response_content(self, response) -> Optional[str]: + """Decode response content with multiple encoding attempts""" + try: + return response.text + except UnicodeDecodeError: + # Try manual encoding for Spanish content + for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']: + try: + content = response.content.decode(encoding) + if content and len(content) > 100: + self.logger.debug("Successfully decoded with encoding", encoding=encoding) + return content + except UnicodeDecodeError: + continue + return None + + def _build_historical_url(self, year: int, month: int) -> str: + """Build historical ZIP URL for given year and month""" + # Madrid historical data URL pattern + base_url = "https://datos.madrid.es/egob/catalogo/208627" + + # URL numbering pattern (this may need adjustment based on actual URLs) + if year == 2023: + url_number = 116 + (month - 1) # 116-127 for 2023 + elif year == 2024: + url_number = 128 + (month - 1) # 128-139 for 2024 + else: + url_number = 116 # Fallback + + return f"{base_url}-{url_number}-transporte-ptomedida-historico.zip" + + async def fetch_current_traffic_xml(self, endpoint: Optional[str] = None) -> Optional[str]: + """Fetch current traffic XML data""" + endpoint = endpoint or self.TRAFFIC_ENDPOINT + + try: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': 'application/xml,text/xml,*/*', + 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'no-cache', + 'Referer': 'https://datos.madrid.es/' + } + + response = await self.get(endpoint, headers=headers, timeout=30) + + if not response or response.status_code != 200: + self.logger.warning("Failed to fetch XML data", + endpoint=endpoint, + status=response.status_code if response else None) + return None + + # Get XML content with encoding handling + xml_content = self._decode_response_content(response) + if not xml_content: + self.logger.debug("No XML content received", endpoint=endpoint) + return None + + self.logger.debug("Madrid XML content fetched", + length=len(xml_content), + endpoint=endpoint) + + return xml_content + + except Exception as e: + self.logger.error("Error fetching traffic XML data", + endpoint=endpoint, + error=str(e)) + return None + + async def fetch_measurement_points_csv(self, url: Optional[str] = None) -> Optional[str]: + """Fetch measurement points CSV data""" + url = url or self.MEASUREMENT_POINTS_URL + + try: + async with httpx.AsyncClient( + timeout=30.0, + headers={ + 'User-Agent': 'MadridTrafficClient/2.0', + 'Accept': 'text/csv,application/csv,*/*' + }, + follow_redirects=True + ) as client: + + self.logger.debug("Fetching measurement points registry", url=url) + response = await client.get(url) + + if response.status_code == 200: + return response.text + else: + self.logger.warning("Failed to fetch measurement points", + status=response.status_code, url=url) + return None + + except Exception as e: + self.logger.error("Error fetching measurement points registry", + url=url, error=str(e)) + return None + + async def fetch_historical_zip(self, zip_url: str) -> Optional[bytes]: + """Fetch historical traffic ZIP file""" + try: + async with httpx.AsyncClient( + timeout=120.0, # Longer timeout for large files + headers={ + 'User-Agent': 'MadridTrafficClient/2.0', + 'Accept': 'application/zip,*/*' + }, + follow_redirects=True + ) as client: + + self.logger.debug("Fetching historical ZIP", url=zip_url) + response = await client.get(zip_url) + + if response.status_code == 200: + self.logger.debug("Historical ZIP fetched", + url=zip_url, + size=len(response.content)) + return response.content + else: + self.logger.warning("Failed to fetch historical ZIP", + status=response.status_code, url=zip_url) + return None + + except Exception as e: + self.logger.error("Error fetching historical ZIP", + url=zip_url, error=str(e)) + return None \ No newline at end of file diff --git a/services/data/app/external/models/__init__.py b/services/data/app/external/models/__init__.py new file mode 100644 index 00000000..6914e5e0 --- /dev/null +++ b/services/data/app/external/models/__init__.py @@ -0,0 +1,20 @@ +# ================================================================ +# services/data/app/external/models/__init__.py +# ================================================================ +""" +Madrid traffic models package +""" + +from .madrid_models import ( + TrafficServiceLevel, + CongestionLevel, + MeasurementPoint, + TrafficRecord +) + +__all__ = [ + 'TrafficServiceLevel', + 'CongestionLevel', + 'MeasurementPoint', + 'TrafficRecord' +] \ No newline at end of file diff --git a/services/data/app/external/models/madrid_models.py b/services/data/app/external/models/madrid_models.py new file mode 100644 index 00000000..ef4a342a --- /dev/null +++ b/services/data/app/external/models/madrid_models.py @@ -0,0 +1,66 @@ +# ================================================================ +# services/data/app/external/models/madrid_models.py +# ================================================================ +""" +Data structures, enums, and dataclasses for Madrid traffic system +""" + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import Optional + + +class TrafficServiceLevel(Enum): + """Madrid traffic service levels""" + FLUID = 0 + DENSE = 1 + CONGESTED = 2 + BLOCKED = 3 + + +class CongestionLevel(Enum): + """Standardized congestion levels""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + BLOCKED = "blocked" + + +@dataclass +class MeasurementPoint: + """Madrid measurement point data structure""" + id: str + latitude: float + longitude: float + distance: float + name: str + type: str + + +@dataclass +class TrafficRecord: + """Standardized traffic record with pedestrian inference""" + date: datetime + traffic_volume: int + occupation_percentage: int + load_percentage: int + average_speed: int + congestion_level: str + pedestrian_count: int + measurement_point_id: str + measurement_point_name: str + road_type: str + source: str + district: Optional[str] = None + + # Madrid-specific data + intensidad_raw: Optional[int] = None + ocupacion_raw: Optional[int] = None + carga_raw: Optional[int] = None + vmed_raw: Optional[int] = None + + # Pedestrian inference metadata + pedestrian_multiplier: Optional[float] = None + time_pattern_factor: Optional[float] = None + district_factor: Optional[float] = None \ No newline at end of file diff --git a/services/data/app/external/processors/__init__.py b/services/data/app/external/processors/__init__.py new file mode 100644 index 00000000..52f5ca33 --- /dev/null +++ b/services/data/app/external/processors/__init__.py @@ -0,0 +1,14 @@ +# ================================================================ +# services/data/app/external/processors/__init__.py +# ================================================================ +""" +Data processors package +""" + +from .madrid_processor import MadridTrafficDataProcessor +from .madrid_business_logic import MadridTrafficAnalyzer + +__all__ = [ + 'MadridTrafficDataProcessor', + 'MadridTrafficAnalyzer' +] \ No newline at end of file diff --git a/services/data/app/external/processors/madrid_business_logic.py b/services/data/app/external/processors/madrid_business_logic.py new file mode 100644 index 00000000..feca5904 --- /dev/null +++ b/services/data/app/external/processors/madrid_business_logic.py @@ -0,0 +1,346 @@ +# ================================================================ +# services/data/app/external/processors/madrid_business_logic.py +# ================================================================ +""" +Business rules, inference, and domain logic for Madrid traffic data +Handles pedestrian inference, district mapping, road classification, and validation +""" + +import math +import re +from datetime import datetime +from typing import Dict, List, Any, Optional, Tuple +import structlog + +from ..models.madrid_models import TrafficRecord, CongestionLevel + + +class MadridTrafficAnalyzer: + """Handles business logic for Madrid traffic analysis""" + + # Madrid district characteristics for pedestrian patterns + DISTRICT_MULTIPLIERS = { + 'Centro': 2.5, # Historic center, high pedestrian activity + 'Salamanca': 2.0, # Shopping area, high foot traffic + 'Chamberí': 1.8, # Business district + 'Retiro': 2.2, # Near park, high leisure activity + 'Chamartín': 1.6, # Business/residential + 'Tetuán': 1.4, # Mixed residential/commercial + 'Fuencarral': 1.3, # Residential with commercial areas + 'Moncloa': 1.7, # University area + 'Latina': 1.5, # Residential area + 'Carabanchel': 1.2, # Residential periphery + 'Usera': 1.1, # Industrial/residential + 'Villaverde': 1.0, # Industrial area + 'Villa de Vallecas': 1.0, # Peripheral residential + 'Vicálvaro': 0.9, # Peripheral + 'San Blas': 1.1, # Residential + 'Barajas': 0.8, # Airport area, low pedestrian activity + 'Hortaleza': 1.2, # Mixed area + 'Ciudad Lineal': 1.3, # Linear development + 'Puente de Vallecas': 1.2, # Working class area + 'Moratalaz': 1.1, # Residential + 'Arganzuela': 1.6, # Near center, growing area + } + + # Time-based patterns (hour of day) + TIME_PATTERNS = { + 'morning_peak': {'hours': [7, 8, 9], 'multiplier': 2.0}, + 'lunch_peak': {'hours': [12, 13, 14], 'multiplier': 2.5}, + 'evening_peak': {'hours': [18, 19, 20], 'multiplier': 2.2}, + 'afternoon': {'hours': [15, 16, 17], 'multiplier': 1.8}, + 'late_evening': {'hours': [21, 22], 'multiplier': 1.5}, + 'night': {'hours': [23, 0, 1, 2, 3, 4, 5, 6], 'multiplier': 0.3}, + 'morning': {'hours': [10, 11], 'multiplier': 1.4} + } + + # Road type specific patterns + ROAD_TYPE_BASE = { + 'URB': 250, # Urban streets - high pedestrian activity + 'M30': 50, # Ring road - minimal pedestrians + 'C30': 75, # Secondary ring - some pedestrian access + 'A': 25, # Highways - very low pedestrians + 'R': 40 # Radial roads - low to moderate + } + + # Weather impact on pedestrian activity + WEATHER_IMPACT = { + 'rain': 0.6, # 40% reduction in rain + 'hot_weather': 0.8, # 20% reduction when very hot + 'cold_weather': 0.7, # 30% reduction when very cold + 'normal': 1.0 # No impact + } + + def __init__(self): + self.logger = structlog.get_logger() + + def calculate_pedestrian_flow( + self, + traffic_record: TrafficRecord, + location_context: Optional[Dict[str, Any]] = None + ) -> Tuple[int, Dict[str, float]]: + """ + Calculate pedestrian flow estimate with detailed metadata + + Returns: + Tuple of (pedestrian_count, inference_metadata) + """ + # Base calculation from road type + road_type = traffic_record.road_type or 'URB' + base_pedestrians = self.ROAD_TYPE_BASE.get(road_type, 200) + + # Time pattern adjustment + hour = traffic_record.date.hour + time_factor = self._get_time_pattern_factor(hour) + + # District adjustment (if available) + district_factor = 1.0 + district = traffic_record.district or self.infer_district_from_location(location_context) + if district: + district_factor = self.DISTRICT_MULTIPLIERS.get(district, 1.0) + + # Traffic correlation adjustment + traffic_factor = self._calculate_traffic_correlation(traffic_record) + + # Weather adjustment (if data available) + weather_factor = self._get_weather_factor(traffic_record.date, location_context) + + # Weekend adjustment + weekend_factor = self._get_weekend_factor(traffic_record.date) + + # Combined calculation + pedestrian_count = int( + base_pedestrians * + time_factor * + district_factor * + traffic_factor * + weather_factor * + weekend_factor + ) + + # Ensure reasonable bounds + pedestrian_count = max(10, min(2000, pedestrian_count)) + + # Metadata for model training + inference_metadata = { + 'base_pedestrians': base_pedestrians, + 'time_factor': time_factor, + 'district_factor': district_factor, + 'traffic_factor': traffic_factor, + 'weather_factor': weather_factor, + 'weekend_factor': weekend_factor, + 'inferred_district': district, + 'hour': hour, + 'road_type': road_type + } + + return pedestrian_count, inference_metadata + + def _get_time_pattern_factor(self, hour: int) -> float: + """Get time-based pedestrian activity multiplier""" + for pattern, config in self.TIME_PATTERNS.items(): + if hour in config['hours']: + return config['multiplier'] + return 1.0 # Default multiplier + + def _calculate_traffic_correlation(self, traffic_record: TrafficRecord) -> float: + """ + Calculate pedestrian correlation with traffic patterns + Higher traffic in urban areas often correlates with more pedestrians + """ + if traffic_record.road_type == 'URB': + # Urban areas: moderate traffic indicates commercial activity + if 30 <= traffic_record.load_percentage <= 70: + return 1.3 # Sweet spot for pedestrian activity + elif traffic_record.load_percentage > 70: + return 0.9 # Too congested, pedestrians avoid + else: + return 1.0 # Normal correlation + else: + # Highway/ring roads: more traffic = fewer pedestrians + if traffic_record.load_percentage > 60: + return 0.5 + else: + return 0.8 + + def _get_weather_factor(self, date: datetime, location_context: Optional[Dict] = None) -> float: + """Estimate weather impact on pedestrian activity""" + # Simplified weather inference based on season and typical Madrid patterns + month = date.month + + # Madrid seasonal patterns + if month in [12, 1, 2]: # Winter - cold weather impact + return self.WEATHER_IMPACT['cold_weather'] + elif month in [7, 8]: # Summer - hot weather impact + return self.WEATHER_IMPACT['hot_weather'] + elif month in [10, 11, 3, 4]: # Rainy seasons - moderate impact + return 0.85 + else: # Spring/early summer - optimal weather + return 1.1 + + def _get_weekend_factor(self, date: datetime) -> float: + """Weekend vs weekday pedestrian patterns""" + weekday = date.weekday() + hour = date.hour + + if weekday >= 5: # Weekend + if 11 <= hour <= 16: # Weekend shopping/leisure hours + return 1.4 + elif 20 <= hour <= 23: # Weekend evening activity + return 1.3 + else: + return 0.9 + else: # Weekday + return 1.0 + + def infer_district_from_location(self, location_context: Optional[Dict] = None) -> Optional[str]: + """ + Infer Madrid district from location context or coordinates + """ + if not location_context: + return None + + lat = location_context.get('latitude') + lon = location_context.get('longitude') + + if not (lat and lon): + return None + + # Madrid district boundaries (simplified boundaries for inference) + districts = { + # Central districts + 'Centro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.720, 'lon_max': -3.690}, + 'Arganzuela': {'lat_min': 40.385, 'lat_max': 40.410, 'lon_min': -3.720, 'lon_max': -3.680}, + 'Retiro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.690, 'lon_max': -3.660}, + 'Salamanca': {'lat_min': 40.420, 'lat_max': 40.445, 'lon_min': -3.690, 'lon_max': -3.660}, + 'Chamartín': {'lat_min': 40.445, 'lat_max': 40.480, 'lon_min': -3.690, 'lon_max': -3.660}, + 'Tetuán': {'lat_min': 40.445, 'lat_max': 40.470, 'lon_min': -3.720, 'lon_max': -3.690}, + 'Chamberí': {'lat_min': 40.425, 'lat_max': 40.450, 'lon_min': -3.720, 'lon_max': -3.690}, + 'Fuencarral-El Pardo': {'lat_min': 40.470, 'lat_max': 40.540, 'lon_min': -3.750, 'lon_max': -3.650}, + 'Moncloa-Aravaca': {'lat_min': 40.430, 'lat_max': 40.480, 'lon_min': -3.750, 'lon_max': -3.720}, + 'Latina': {'lat_min': 40.380, 'lat_max': 40.420, 'lon_min': -3.750, 'lon_max': -3.720}, + 'Carabanchel': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.750, 'lon_max': -3.720}, + 'Usera': {'lat_min': 40.350, 'lat_max': 40.385, 'lon_min': -3.720, 'lon_max': -3.690}, + 'Puente de Vallecas': {'lat_min': 40.370, 'lat_max': 40.410, 'lon_min': -3.680, 'lon_max': -3.640}, + 'Moratalaz': {'lat_min': 40.400, 'lat_max': 40.430, 'lon_min': -3.650, 'lon_max': -3.620}, + 'Ciudad Lineal': {'lat_min': 40.430, 'lat_max': 40.460, 'lon_min': -3.650, 'lon_max': -3.620}, + 'Hortaleza': {'lat_min': 40.460, 'lat_max': 40.500, 'lon_min': -3.650, 'lon_max': -3.620}, + 'Villaverde': {'lat_min': 40.320, 'lat_max': 40.360, 'lon_min': -3.720, 'lon_max': -3.680}, + } + + # Find matching district + for district_name, bounds in districts.items(): + if (bounds['lat_min'] <= lat <= bounds['lat_max'] and + bounds['lon_min'] <= lon <= bounds['lon_max']): + return district_name + + # Default for coordinates in Madrid but not matching specific districts + if 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5: + return 'Other Madrid' + + return None + + def classify_road_type(self, measurement_point_name: str) -> str: + """Classify road type based on measurement point name""" + if not measurement_point_name: + return 'URB' # Default to urban + + name_upper = measurement_point_name.upper() + + # Highway patterns + if any(pattern in name_upper for pattern in ['A-', 'AP-', 'AUTOPISTA', 'AUTOVIA']): + return 'A' + + # M-30 Ring road + if 'M-30' in name_upper or 'M30' in name_upper: + return 'M30' + + # Other M roads (ring roads) + if re.search(r'M-[0-9]', name_upper) or re.search(r'M[0-9]', name_upper): + return 'C30' + + # Radial roads (R-1, R-2, etc.) + if re.search(r'R-[0-9]', name_upper) or 'RADIAL' in name_upper: + return 'R' + + # Default to urban street + return 'URB' + + def validate_madrid_coordinates(self, lat: float, lon: float) -> bool: + """Validate coordinates are within Madrid bounds""" + # Madrid metropolitan area bounds + return 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5 + + def get_congestion_level(self, occupation_pct: float) -> str: + """Convert occupation percentage to congestion level""" + if occupation_pct >= 80: + return CongestionLevel.BLOCKED.value + elif occupation_pct >= 50: + return CongestionLevel.HIGH.value + elif occupation_pct >= 25: + return CongestionLevel.MEDIUM.value + else: + return CongestionLevel.LOW.value + + def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate distance between two points in kilometers using Haversine formula""" + R = 6371 # Earth's radius in kilometers + + dlat = math.radians(lat2 - lat1) + dlon = math.radians(lon2 - lon1) + a = (math.sin(dlat/2) * math.sin(dlat/2) + + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * + math.sin(dlon/2) * math.sin(dlon/2)) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) + + return R * c + + def find_nearest_traffic_point(self, traffic_points: List[Dict[str, Any]], + latitude: float, longitude: float) -> Optional[Dict[str, Any]]: + """Find the nearest traffic point to given coordinates""" + if not traffic_points: + return None + + min_distance = float('inf') + nearest_point = None + + for point in traffic_points: + point_lat = point.get('latitude') + point_lon = point.get('longitude') + + if point_lat and point_lon: + distance = self.calculate_distance(latitude, longitude, point_lat, point_lon) + if distance < min_distance: + min_distance = distance + nearest_point = point + + return nearest_point + + def find_nearest_measurement_points(self, measurement_points: Dict[str, Dict[str, Any]], + latitude: float, longitude: float, + num_points: int = 3, max_distance_km: Optional[float] = 5.0) -> List[Tuple[str, Dict[str, Any], float]]: + """Find nearest measurement points for historical data""" + distances = [] + + for point_id, point_data in measurement_points.items(): + point_lat = point_data.get('latitude') + point_lon = point_data.get('longitude') + + if point_lat and point_lon: + distance_km = self.calculate_distance(latitude, longitude, point_lat, point_lon) + distances.append((point_id, point_data, distance_km)) + + # Sort by distance and take nearest points + distances.sort(key=lambda x: x[2]) + + # Apply distance filter if specified + if max_distance_km is not None: + distances = [p for p in distances if p[2] <= max_distance_km] + + nearest = distances[:num_points] + + self.logger.info("Found nearest measurement points", + count=len(nearest), + nearest_distance_km=nearest[0][2] if nearest else None) + + return nearest \ No newline at end of file diff --git a/services/data/app/external/processors/madrid_processor.py b/services/data/app/external/processors/madrid_processor.py new file mode 100644 index 00000000..d64907bb --- /dev/null +++ b/services/data/app/external/processors/madrid_processor.py @@ -0,0 +1,478 @@ +# ================================================================ +# services/data/app/external/processors/madrid_processor.py +# ================================================================ +""" +Data transformation and parsing for Madrid traffic data +Handles XML parsing, CSV processing, coordinate conversion, and data quality scoring +""" + +import csv +import io +import math +import re +import xml.etree.ElementTree as ET +import zipfile +from datetime import datetime, timezone +from typing import Dict, List, Any, Optional, Tuple +import structlog +import pyproj + +from ..models.madrid_models import TrafficRecord, MeasurementPoint, CongestionLevel + + +class MadridTrafficDataProcessor: + """Handles all data transformation and parsing for Madrid traffic data""" + + def __init__(self): + self.logger = structlog.get_logger() + # UTM Zone 30N (Madrid's coordinate system) + self.utm_proj = pyproj.Proj(proj='utm', zone=30, ellps='WGS84', datum='WGS84') + self.wgs84_proj = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') + + def safe_int(self, value: str) -> int: + """Safely convert string to int""" + try: + return int(float(value.replace(',', '.'))) + except (ValueError, TypeError): + return 0 + + def _safe_float(self, value: str) -> float: + """Safely convert string to float""" + try: + return float(value.replace(',', '.')) + except (ValueError, TypeError): + return 0.0 + + def clean_madrid_xml(self, xml_content: str) -> str: + """Clean and prepare Madrid XML content for parsing""" + if not xml_content: + return "" + + # Remove BOM and extra whitespace + cleaned = xml_content.strip() + if cleaned.startswith('\ufeff'): + cleaned = cleaned[1:] + + # Fix common XML issues + cleaned = re.sub(r'&(?!amp;|lt;|gt;|quot;|apos;)', '&', cleaned) + + # Ensure proper encoding declaration + if not cleaned.startswith('\n' + cleaned + + return cleaned + + def convert_utm_to_latlon(self, utm_x: str, utm_y: str) -> Tuple[Optional[float], Optional[float]]: + """Convert UTM coordinates to latitude/longitude""" + try: + utm_x_float = float(utm_x.replace(',', '.')) + utm_y_float = float(utm_y.replace(',', '.')) + + # Convert from UTM Zone 30N to WGS84 + longitude, latitude = pyproj.transform(self.utm_proj, self.wgs84_proj, utm_x_float, utm_y_float) + + # Validate coordinates are in Madrid area + if 40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5: + return latitude, longitude + else: + self.logger.debug("Coordinates outside Madrid bounds", + lat=latitude, lon=longitude, utm_x=utm_x, utm_y=utm_y) + return None, None + + except Exception as e: + self.logger.debug("UTM conversion error", + utm_x=utm_x, utm_y=utm_y, error=str(e)) + return None, None + + def parse_traffic_xml(self, xml_content: str) -> List[Dict[str, Any]]: + """Parse Madrid traffic XML data""" + traffic_points = [] + + try: + cleaned_xml = self.clean_madrid_xml(xml_content) + root = ET.fromstring(cleaned_xml) + + self.logger.debug("Madrid XML structure", root_tag=root.tag, children_count=len(list(root))) + + if root.tag == 'pms': + pm_elements = root.findall('pm') + self.logger.debug("Found PM elements", count=len(pm_elements)) + + for pm in pm_elements: + try: + traffic_point = self._extract_madrid_pm_element(pm) + + if self._is_valid_traffic_point(traffic_point): + traffic_points.append(traffic_point) + + # Log first few points for debugging + if len(traffic_points) <= 3: + self.logger.debug("Sample traffic point", + id=traffic_point['idelem'], + lat=traffic_point['latitude'], + lon=traffic_point['longitude'], + intensity=traffic_point.get('intensidad')) + + except Exception as e: + self.logger.debug("Error parsing PM element", error=str(e)) + continue + else: + self.logger.warning("Unexpected XML root tag", root_tag=root.tag) + + self.logger.debug("Madrid traffic XML parsing completed", valid_points=len(traffic_points)) + return traffic_points + + except ET.ParseError as e: + self.logger.warning("Failed to parse Madrid XML", error=str(e)) + return self._extract_traffic_data_regex(xml_content) + except Exception as e: + self.logger.error("Error in Madrid traffic XML parsing", error=str(e)) + return [] + + def _extract_madrid_pm_element(self, pm_element) -> Dict[str, Any]: + """Extract traffic data from Madrid element with coordinate conversion""" + try: + point_data = {} + utm_x = utm_y = None + + # Extract all child elements + for child in pm_element: + tag, text = child.tag, child.text.strip() if child.text else '' + + if tag == 'idelem': + point_data['idelem'] = text + elif tag == 'descripcion': + point_data['descripcion'] = text + elif tag == 'intensidad': + point_data['intensidad'] = self.safe_int(text) + elif tag == 'ocupacion': + point_data['ocupacion'] = self._safe_float(text) + elif tag == 'carga': + point_data['carga'] = self.safe_int(text) + elif tag == 'nivelServicio': + point_data['nivelServicio'] = self.safe_int(text) + elif tag == 'st_x': # UTM X coordinate + utm_x = text + point_data['utm_x'] = text + elif tag == 'st_y': # UTM Y coordinate + utm_y = text + point_data['utm_y'] = text + elif tag == 'error': + point_data['error'] = text + elif tag in ['subarea', 'accesoAsociado', 'intensidadSat']: + point_data[tag] = text + + # Convert coordinates + if utm_x and utm_y: + latitude, longitude = self.convert_utm_to_latlon(utm_x, utm_y) + + if latitude and longitude: + point_data.update({ + 'latitude': latitude, + 'longitude': longitude, + 'measurement_point_id': point_data.get('idelem'), + 'measurement_point_name': point_data.get('descripcion'), + 'timestamp': datetime.now(timezone.utc), + 'source': 'madrid_opendata_xml' + }) + + return point_data + else: + self.logger.debug("Invalid coordinates after conversion", + idelem=point_data.get('idelem'), utm_x=utm_x, utm_y=utm_y) + return {} + else: + self.logger.debug("Missing UTM coordinates", idelem=point_data.get('idelem')) + return {} + + except Exception as e: + self.logger.debug("Error extracting PM element", error=str(e)) + return {} + + def _is_valid_traffic_point(self, traffic_point: Dict[str, Any]) -> bool: + """Validate traffic point data""" + required_fields = ['idelem', 'latitude', 'longitude'] + return all(field in traffic_point and traffic_point[field] for field in required_fields) + + def _extract_traffic_data_regex(self, xml_content: str) -> List[Dict[str, Any]]: + """Fallback regex-based extraction if XML parsing fails""" + traffic_points = [] + + try: + # Pattern to match PM elements + pm_pattern = r'(.*?)' + pm_matches = re.findall(pm_pattern, xml_content, re.DOTALL) + + for pm_content in pm_matches: + traffic_point = {} + + # Extract key fields + patterns = { + 'idelem': r'(.*?)', + 'descripcion': r'(.*?)', + 'intensidad': r'(.*?)', + 'ocupacion': r'(.*?)', + 'st_x': r'(.*?)', + 'st_y': r'(.*?)' + } + + for field, pattern in patterns.items(): + match = re.search(pattern, pm_content) + if match: + traffic_point[field] = match.group(1).strip() + + # Convert coordinates + if 'st_x' in traffic_point and 'st_y' in traffic_point: + latitude, longitude = self.convert_utm_to_latlon( + traffic_point['st_x'], traffic_point['st_y'] + ) + + if latitude and longitude: + traffic_point.update({ + 'latitude': latitude, + 'longitude': longitude, + 'intensidad': self.safe_int(traffic_point.get('intensidad', '0')), + 'ocupacion': self._safe_float(traffic_point.get('ocupacion', '0')), + 'measurement_point_id': traffic_point.get('idelem'), + 'measurement_point_name': traffic_point.get('descripcion'), + 'timestamp': datetime.now(timezone.utc), + 'source': 'madrid_opendata_xml_regex' + }) + + traffic_points.append(traffic_point) + + self.logger.debug("Regex extraction completed", points=len(traffic_points)) + return traffic_points + + except Exception as e: + self.logger.error("Error in regex extraction", error=str(e)) + return [] + + def parse_measurement_points_csv(self, csv_content: str) -> Dict[str, Dict[str, Any]]: + """Parse measurement points CSV into lookup dictionary""" + measurement_points = {} + + try: + # Parse CSV with semicolon delimiter + csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=';') + + processed_count = 0 + for row in csv_reader: + try: + # Extract point ID and coordinates + point_id = row.get('id', '').strip() + if not point_id: + continue + + processed_count += 1 + + # Try different coordinate field names + lat_str = '' + lon_str = '' + + # Common coordinate field patterns + lat_fields = ['lat', 'latitude', 'latitud', 'y', 'utm_y'] + lon_fields = ['lon', 'lng', 'longitude', 'longitud', 'x', 'utm_x'] + + for field in lat_fields: + if field in row and row[field].strip(): + lat_str = row[field].strip() + break + + for field in lon_fields: + if field in row and row[field].strip(): + lon_str = row[field].strip() + break + + if lat_str and lon_str: + try: + # Try direct lat/lon first + latitude = self._safe_float(lat_str) + longitude = self._safe_float(lon_str) + + # If values look like UTM coordinates, convert them + if latitude > 1000 or longitude > 1000: + latitude, longitude = self.convert_utm_to_latlon(lon_str, lat_str) + if not latitude or not longitude: + continue + + # Validate Madrid area + if not (40.3 <= latitude <= 40.6 and -3.8 <= longitude <= -3.5): + continue + + measurement_points[point_id] = { + 'id': point_id, + 'latitude': latitude, + 'longitude': longitude, + 'name': row.get('nombre', row.get('descripcion', f"Point {point_id}")), + 'type': row.get('tipo', 'traffic'), + 'raw_data': dict(row) # Keep original data + } + + except Exception as e: + self.logger.debug("Error processing point coordinates", + point_id=point_id, error=str(e)) + continue + + except Exception as e: + self.logger.debug("Error processing CSV row", error=str(e)) + continue + + self.logger.info("Parsed measurement points registry", + total_points=len(measurement_points)) + return measurement_points + + except Exception as e: + self.logger.error("Error parsing measurement points CSV", error=str(e)) + return {} + + def calculate_data_quality_score(self, row: Dict[str, str]) -> float: + """Calculate data quality score for a traffic record""" + try: + score = 1.0 + + # Check for missing or invalid values + intensidad = row.get('intensidad', '').strip() + if not intensidad or intensidad in ['N', '', '0']: + score *= 0.7 + + ocupacion = row.get('ocupacion', '').strip() + if not ocupacion or ocupacion in ['N', '', '0']: + score *= 0.8 + + error_status = row.get('error', '').strip() + if error_status and error_status != 'N': + score *= 0.6 + + # Check for reasonable value ranges + try: + intensidad_val = self.safe_int(intensidad) + if intensidad_val < 0 or intensidad_val > 5000: # Unrealistic traffic volume + score *= 0.7 + + ocupacion_val = self.safe_int(ocupacion) + if ocupacion_val < 0 or ocupacion_val > 100: # Invalid percentage + score *= 0.5 + + except: + score *= 0.6 + + return max(0.1, score) # Minimum quality score + + except Exception as e: + self.logger.debug("Error calculating quality score", error=str(e)) + return 0.5 # Default medium quality + + async def process_csv_content_chunked(self, text_content: str, csv_filename: str, + nearest_ids: set, nearest_points: list) -> list: + """Process CSV content in chunks to prevent memory issues""" + import csv + import io + import gc + + try: + csv_reader = csv.DictReader(io.StringIO(text_content), delimiter=';') + + chunk_size = 10000 + chunk_records = [] + all_records = [] + processed_count = 0 + total_rows_seen = 0 + + for row in csv_reader: + total_rows_seen += 1 + measurement_point_id = row.get('id', '').strip() + + if measurement_point_id not in nearest_ids: + continue + + try: + record_data = await self.parse_historical_csv_row(row, nearest_points) + + if record_data: + chunk_records.append(record_data) + processed_count += 1 + + if len(chunk_records) >= chunk_size: + all_records.extend(chunk_records) + chunk_records = [] + gc.collect() + + except Exception as e: + if processed_count < 5: + self.logger.error("Row parsing exception", + row_num=total_rows_seen, + measurement_point_id=measurement_point_id, + error=str(e)) + continue + + # Process remaining records + if chunk_records: + all_records.extend(chunk_records) + chunk_records = [] + gc.collect() + + self.logger.info("Processed CSV file", + filename=csv_filename, + total_rows_read=total_rows_seen, + processed_records=processed_count) + + return all_records + + except Exception as e: + self.logger.error("Error processing CSV content", + filename=csv_filename, error=str(e)) + return [] + + async def parse_historical_csv_row(self, row: dict, nearest_points: list) -> dict: + """Parse a single row from Madrid's historical traffic CSV""" + try: + # Extract date + fecha_str = row.get('fecha', '').strip() + if not fecha_str: + return None + + try: + from datetime import datetime, timezone + date_obj = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') + date_obj = date_obj.replace(tzinfo=timezone.utc) + except Exception: + return None + + measurement_point_id = row.get('id', '').strip() + + # Find point data + point_match = next((p for p in nearest_points if p[0] == measurement_point_id), None) + if not point_match: + return None + + point_data = point_match[1] + distance_km = point_match[2] + + # Extract traffic data + intensidad = self.safe_int(row.get('intensidad', '0')) + ocupacion = self.safe_int(row.get('ocupacion', '0')) + carga = self.safe_int(row.get('carga', '0')) + vmed = self.safe_int(row.get('vmed', '0')) + + # Build basic result (business logic will be applied elsewhere) + result = { + 'date': date_obj, + 'measurement_point_id': measurement_point_id, + 'point_data': point_data, + 'distance_km': distance_km, + 'traffic_data': { + 'intensidad': intensidad, + 'ocupacion': ocupacion, + 'carga': carga, + 'vmed': vmed + }, + 'data_quality_score': self.calculate_data_quality_score(row), + 'raw_row': row + } + + return result + + except Exception as e: + self.logger.debug("Error parsing historical CSV row", error=str(e)) + return None \ No newline at end of file diff --git a/services/forecasting/app/services/forecasting_service.py b/services/forecasting/app/services/forecasting_service.py index d9266683..923272be 100644 --- a/services/forecasting/app/services/forecasting_service.py +++ b/services/forecasting/app/services/forecasting_service.py @@ -639,10 +639,17 @@ class EnhancedForecastingService: if precipitation > 2.0: adjustment_factor *= 0.7 - # Apply adjustments + # Apply adjustments to prediction adjusted_prediction = max(0, base_prediction * adjustment_factor) - adjusted_lower = max(0, lower_bound * adjustment_factor) - adjusted_upper = max(0, upper_bound * adjustment_factor) + + # For confidence bounds, preserve relative interval width while respecting minimum bounds + original_interval = upper_bound - lower_bound + adjusted_interval = original_interval * adjustment_factor + + # Ensure minimum reasonable lower bound (at least 20% of prediction or 5, whichever is larger) + min_lower_bound = max(adjusted_prediction * 0.2, 5.0) + adjusted_lower = max(min_lower_bound, adjusted_prediction - (adjusted_interval / 2)) + adjusted_upper = max(adjusted_lower + 10, adjusted_prediction + (adjusted_interval / 2)) return { "prediction": adjusted_prediction, diff --git a/services/training/app/ml/prophet_manager.py b/services/training/app/ml/prophet_manager.py index 86e3134d..c446e445 100644 --- a/services/training/app/ml/prophet_manager.py +++ b/services/training/app/ml/prophet_manager.py @@ -162,7 +162,8 @@ class BakeryProphetManager: 'seasonality_mode': 'additive', 'daily_seasonality': False, 'weekly_seasonality': True, - 'yearly_seasonality': False + 'yearly_seasonality': False, + 'uncertainty_samples': 100 # ✅ FIX: Minimal uncertainty sampling for very sparse data } elif zero_ratio > 0.6: logger.info(f"Moderate sparsity for {product_name}, using conservative optimization") @@ -174,7 +175,8 @@ class BakeryProphetManager: 'seasonality_mode': 'additive', 'daily_seasonality': False, 'weekly_seasonality': True, - 'yearly_seasonality': len(df) > 365 # Only if we have enough data + 'yearly_seasonality': len(df) > 365, # Only if we have enough data + 'uncertainty_samples': 200 # ✅ FIX: Conservative uncertainty sampling for moderately sparse data } # Use unique seed for each product to avoid identical results @@ -196,6 +198,16 @@ class BakeryProphetManager: changepoint_scale_range = (0.001, 0.5) seasonality_scale_range = (0.01, 10.0) + # ✅ FIX: Determine appropriate uncertainty samples range based on product category + if product_category == 'high_volume': + uncertainty_range = (300, 800) # More samples for stable high-volume products + elif product_category == 'medium_volume': + uncertainty_range = (200, 500) # Moderate samples for medium volume + elif product_category == 'low_volume': + uncertainty_range = (150, 300) # Fewer samples for low volume + else: # intermittent + uncertainty_range = (100, 200) # Minimal samples for intermittent demand + params = { 'changepoint_prior_scale': trial.suggest_float( 'changepoint_prior_scale', @@ -214,7 +226,8 @@ class BakeryProphetManager: 'seasonality_mode': 'additive' if product_category == 'high_volume' else trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), 'daily_seasonality': trial.suggest_categorical('daily_seasonality', [True, False]), 'weekly_seasonality': True, # Always keep weekly - 'yearly_seasonality': trial.suggest_categorical('yearly_seasonality', [True, False]) + 'yearly_seasonality': trial.suggest_categorical('yearly_seasonality', [True, False]), + 'uncertainty_samples': trial.suggest_int('uncertainty_samples', uncertainty_range[0], uncertainty_range[1]) # ✅ FIX: Adaptive uncertainty sampling } # Simple 2-fold cross-validation for speed @@ -229,8 +242,10 @@ class BakeryProphetManager: continue try: - # Create and train model - model = Prophet(**params, interval_width=0.8, uncertainty_samples=100) + # Create and train model with adaptive uncertainty sampling + uncertainty_samples = params.get('uncertainty_samples', 200) # ✅ FIX: Use adaptive uncertainty samples + model = Prophet(**{k: v for k, v in params.items() if k != 'uncertainty_samples'}, + interval_width=0.8, uncertainty_samples=uncertainty_samples) for regressor in regressor_columns: if regressor in train_data.columns: @@ -291,6 +306,12 @@ class BakeryProphetManager: logger.info(f"Optimization completed for {product_name}. Best score: {best_score:.2f}%. " f"Parameters: {best_params}") + + # ✅ FIX: Log uncertainty sampling configuration for debugging confidence intervals + uncertainty_samples = best_params.get('uncertainty_samples', 500) + logger.info(f"Prophet model will use {uncertainty_samples} uncertainty samples for {product_name} " + f"(category: {product_category}, zero_ratio: {zero_ratio:.2f})") + return best_params def _classify_product(self, product_name: str, sales_data: pd.DataFrame) -> str: @@ -329,9 +350,12 @@ class BakeryProphetManager: return 'intermittent' def _create_optimized_prophet_model(self, optimized_params: Dict[str, Any], regressor_columns: List[str]) -> Prophet: - """Create Prophet model with optimized parameters""" + """Create Prophet model with optimized parameters and adaptive uncertainty sampling""" holidays = self._get_spanish_holidays() + # Determine uncertainty samples based on data characteristics + uncertainty_samples = optimized_params.get('uncertainty_samples', 500) + model = Prophet( holidays=holidays if not holidays.empty else None, daily_seasonality=optimized_params.get('daily_seasonality', True), @@ -344,7 +368,7 @@ class BakeryProphetManager: changepoint_range=optimized_params.get('changepoint_range', 0.8), interval_width=0.8, mcmc_samples=0, - uncertainty_samples=1000 + uncertainty_samples=uncertainty_samples ) return model