# ================================================================ # services/data/app/external/processors/madrid_business_logic.py # ================================================================ """ Business rules, inference, and domain logic for Madrid traffic data Handles pedestrian inference, district mapping, road classification, and validation """ import math import re from datetime import datetime from typing import Dict, List, Any, Optional, Tuple import structlog from ..models.madrid_models import TrafficRecord, CongestionLevel class MadridTrafficAnalyzer: """Handles business logic for Madrid traffic analysis""" # Madrid district characteristics for pedestrian patterns DISTRICT_MULTIPLIERS = { 'Centro': 2.5, # Historic center, high pedestrian activity 'Salamanca': 2.0, # Shopping area, high foot traffic 'Chamberí': 1.8, # Business district 'Retiro': 2.2, # Near park, high leisure activity 'Chamartín': 1.6, # Business/residential 'Tetuán': 1.4, # Mixed residential/commercial 'Fuencarral': 1.3, # Residential with commercial areas 'Moncloa': 1.7, # University area 'Latina': 1.5, # Residential area 'Carabanchel': 1.2, # Residential periphery 'Usera': 1.1, # Industrial/residential 'Villaverde': 1.0, # Industrial area 'Villa de Vallecas': 1.0, # Peripheral residential 'Vicálvaro': 0.9, # Peripheral 'San Blas': 1.1, # Residential 'Barajas': 0.8, # Airport area, low pedestrian activity 'Hortaleza': 1.2, # Mixed area 'Ciudad Lineal': 1.3, # Linear development 'Puente de Vallecas': 1.2, # Working class area 'Moratalaz': 1.1, # Residential 'Arganzuela': 1.6, # Near center, growing area } # Time-based patterns (hour of day) TIME_PATTERNS = { 'morning_peak': {'hours': [7, 8, 9], 'multiplier': 2.0}, 'lunch_peak': {'hours': [12, 13, 14], 'multiplier': 2.5}, 'evening_peak': {'hours': [18, 19, 20], 'multiplier': 2.2}, 'afternoon': {'hours': [15, 16, 17], 'multiplier': 1.8}, 'late_evening': {'hours': [21, 22], 'multiplier': 1.5}, 'night': {'hours': [23, 0, 1, 2, 3, 4, 5, 6], 'multiplier': 0.3}, 'morning': {'hours': [10, 11], 'multiplier': 1.4} } # Road type specific patterns ROAD_TYPE_BASE = { 'URB': 250, # Urban streets - high pedestrian activity 'M30': 50, # Ring road - minimal pedestrians 'C30': 75, # Secondary ring - some pedestrian access 'A': 25, # Highways - very low pedestrians 'R': 40 # Radial roads - low to moderate } # Weather impact on pedestrian activity WEATHER_IMPACT = { 'rain': 0.6, # 40% reduction in rain 'hot_weather': 0.8, # 20% reduction when very hot 'cold_weather': 0.7, # 30% reduction when very cold 'normal': 1.0 # No impact } def __init__(self): self.logger = structlog.get_logger() def calculate_pedestrian_flow( self, traffic_record: TrafficRecord, location_context: Optional[Dict[str, Any]] = None ) -> Tuple[int, Dict[str, float]]: """ Calculate pedestrian flow estimate with detailed metadata Returns: Tuple of (pedestrian_count, inference_metadata) """ # Base calculation from road type road_type = traffic_record.road_type or 'URB' base_pedestrians = self.ROAD_TYPE_BASE.get(road_type, 200) # Time pattern adjustment hour = traffic_record.date.hour time_factor = self._get_time_pattern_factor(hour) # District adjustment (if available) district_factor = 1.0 district = traffic_record.district or self.infer_district_from_location(location_context) if district: district_factor = self.DISTRICT_MULTIPLIERS.get(district, 1.0) # Traffic correlation adjustment traffic_factor = self._calculate_traffic_correlation(traffic_record) # Weather adjustment (if data available) weather_factor = self._get_weather_factor(traffic_record.date, location_context) # Weekend adjustment weekend_factor = self._get_weekend_factor(traffic_record.date) # Combined calculation pedestrian_count = int( base_pedestrians * time_factor * district_factor * traffic_factor * weather_factor * weekend_factor ) # Ensure reasonable bounds pedestrian_count = max(10, min(2000, pedestrian_count)) # Metadata for model training inference_metadata = { 'base_pedestrians': base_pedestrians, 'time_factor': time_factor, 'district_factor': district_factor, 'traffic_factor': traffic_factor, 'weather_factor': weather_factor, 'weekend_factor': weekend_factor, 'inferred_district': district, 'hour': hour, 'road_type': road_type } return pedestrian_count, inference_metadata def _get_time_pattern_factor(self, hour: int) -> float: """Get time-based pedestrian activity multiplier""" for pattern, config in self.TIME_PATTERNS.items(): if hour in config['hours']: return config['multiplier'] return 1.0 # Default multiplier def _calculate_traffic_correlation(self, traffic_record: TrafficRecord) -> float: """ Calculate pedestrian correlation with traffic patterns Higher traffic in urban areas often correlates with more pedestrians """ if traffic_record.road_type == 'URB': # Urban areas: moderate traffic indicates commercial activity if 30 <= traffic_record.load_percentage <= 70: return 1.3 # Sweet spot for pedestrian activity elif traffic_record.load_percentage > 70: return 0.9 # Too congested, pedestrians avoid else: return 1.0 # Normal correlation else: # Highway/ring roads: more traffic = fewer pedestrians if traffic_record.load_percentage > 60: return 0.5 else: return 0.8 def _get_weather_factor(self, date: datetime, location_context: Optional[Dict] = None) -> float: """Estimate weather impact on pedestrian activity""" # Simplified weather inference based on season and typical Madrid patterns month = date.month # Madrid seasonal patterns if month in [12, 1, 2]: # Winter - cold weather impact return self.WEATHER_IMPACT['cold_weather'] elif month in [7, 8]: # Summer - hot weather impact return self.WEATHER_IMPACT['hot_weather'] elif month in [10, 11, 3, 4]: # Rainy seasons - moderate impact return 0.85 else: # Spring/early summer - optimal weather return 1.1 def _get_weekend_factor(self, date: datetime) -> float: """Weekend vs weekday pedestrian patterns""" weekday = date.weekday() hour = date.hour if weekday >= 5: # Weekend if 11 <= hour <= 16: # Weekend shopping/leisure hours return 1.4 elif 20 <= hour <= 23: # Weekend evening activity return 1.3 else: return 0.9 else: # Weekday return 1.0 def infer_district_from_location(self, location_context: Optional[Dict] = None) -> Optional[str]: """ Infer Madrid district from location context or coordinates """ if not location_context: return None lat = location_context.get('latitude') lon = location_context.get('longitude') if not (lat and lon): return None # Madrid district boundaries (simplified boundaries for inference) districts = { # Central districts 'Centro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.720, 'lon_max': -3.690}, 'Arganzuela': {'lat_min': 40.385, 'lat_max': 40.410, 'lon_min': -3.720, 'lon_max': -3.680}, 'Retiro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.690, 'lon_max': -3.660}, 'Salamanca': {'lat_min': 40.420, 'lat_max': 40.445, 'lon_min': -3.690, 'lon_max': -3.660}, 'Chamartín': {'lat_min': 40.445, 'lat_max': 40.480, 'lon_min': -3.690, 'lon_max': -3.660}, 'Tetuán': {'lat_min': 40.445, 'lat_max': 40.470, 'lon_min': -3.720, 'lon_max': -3.690}, 'Chamberí': {'lat_min': 40.425, 'lat_max': 40.450, 'lon_min': -3.720, 'lon_max': -3.690}, 'Fuencarral-El Pardo': {'lat_min': 40.470, 'lat_max': 40.540, 'lon_min': -3.750, 'lon_max': -3.650}, 'Moncloa-Aravaca': {'lat_min': 40.430, 'lat_max': 40.480, 'lon_min': -3.750, 'lon_max': -3.720}, 'Latina': {'lat_min': 40.380, 'lat_max': 40.420, 'lon_min': -3.750, 'lon_max': -3.720}, 'Carabanchel': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.750, 'lon_max': -3.720}, 'Usera': {'lat_min': 40.350, 'lat_max': 40.385, 'lon_min': -3.720, 'lon_max': -3.690}, 'Puente de Vallecas': {'lat_min': 40.370, 'lat_max': 40.410, 'lon_min': -3.680, 'lon_max': -3.640}, 'Moratalaz': {'lat_min': 40.400, 'lat_max': 40.430, 'lon_min': -3.650, 'lon_max': -3.620}, 'Ciudad Lineal': {'lat_min': 40.430, 'lat_max': 40.460, 'lon_min': -3.650, 'lon_max': -3.620}, 'Hortaleza': {'lat_min': 40.460, 'lat_max': 40.500, 'lon_min': -3.650, 'lon_max': -3.620}, 'Villaverde': {'lat_min': 40.320, 'lat_max': 40.360, 'lon_min': -3.720, 'lon_max': -3.680}, } # Find matching district for district_name, bounds in districts.items(): if (bounds['lat_min'] <= lat <= bounds['lat_max'] and bounds['lon_min'] <= lon <= bounds['lon_max']): return district_name # Default for coordinates in Madrid but not matching specific districts if 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5: return 'Other Madrid' return None def classify_road_type(self, measurement_point_name: str) -> str: """Classify road type based on measurement point name""" if not measurement_point_name: return 'URB' # Default to urban name_upper = measurement_point_name.upper() # Highway patterns if any(pattern in name_upper for pattern in ['A-', 'AP-', 'AUTOPISTA', 'AUTOVIA']): return 'A' # M-30 Ring road if 'M-30' in name_upper or 'M30' in name_upper: return 'M30' # Other M roads (ring roads) if re.search(r'M-[0-9]', name_upper) or re.search(r'M[0-9]', name_upper): return 'C30' # Radial roads (R-1, R-2, etc.) if re.search(r'R-[0-9]', name_upper) or 'RADIAL' in name_upper: return 'R' # Default to urban street return 'URB' def validate_madrid_coordinates(self, lat: float, lon: float) -> bool: """Validate coordinates are within Madrid bounds""" # Madrid metropolitan area bounds return 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5 def get_congestion_level(self, occupation_pct: float) -> str: """Convert occupation percentage to congestion level""" if occupation_pct >= 80: return CongestionLevel.BLOCKED.value elif occupation_pct >= 50: return CongestionLevel.HIGH.value elif occupation_pct >= 25: return CongestionLevel.MEDIUM.value else: return CongestionLevel.LOW.value def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two points in kilometers using Haversine formula""" R = 6371 # Earth's radius in kilometers dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def find_nearest_traffic_point(self, traffic_points: List[Dict[str, Any]], latitude: float, longitude: float) -> Optional[Dict[str, Any]]: """Find the nearest traffic point to given coordinates""" if not traffic_points: return None min_distance = float('inf') nearest_point = None for point in traffic_points: point_lat = point.get('latitude') point_lon = point.get('longitude') if point_lat and point_lon: distance = self.calculate_distance(latitude, longitude, point_lat, point_lon) if distance < min_distance: min_distance = distance nearest_point = point return nearest_point def find_nearest_measurement_points(self, measurement_points: Dict[str, Dict[str, Any]], latitude: float, longitude: float, num_points: int = 3, max_distance_km: Optional[float] = 5.0) -> List[Tuple[str, Dict[str, Any], float]]: """Find nearest measurement points for historical data""" distances = [] for point_id, point_data in measurement_points.items(): point_lat = point_data.get('latitude') point_lon = point_data.get('longitude') if point_lat and point_lon: distance_km = self.calculate_distance(latitude, longitude, point_lat, point_lon) distances.append((point_id, point_data, distance_km)) # Sort by distance and take nearest points distances.sort(key=lambda x: x[2]) # Apply distance filter if specified if max_distance_km is not None: distances = [p for p in distances if p[2] <= max_distance_km] nearest = distances[:num_points] self.logger.info("Found nearest measurement points", count=len(nearest), nearest_distance_km=nearest[0][2] if nearest else None) return nearest