346 lines
15 KiB
Python
346 lines
15 KiB
Python
# ================================================================
|
|
# services/data/app/external/processors/madrid_business_logic.py
|
|
# ================================================================
|
|
"""
|
|
Business rules, inference, and domain logic for Madrid traffic data
|
|
Handles pedestrian inference, district mapping, road classification, and validation
|
|
"""
|
|
|
|
import math
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
import structlog
|
|
|
|
from ..models.madrid_models import TrafficRecord, CongestionLevel
|
|
|
|
|
|
class MadridTrafficAnalyzer:
|
|
"""Handles business logic for Madrid traffic analysis"""
|
|
|
|
# Madrid district characteristics for pedestrian patterns
|
|
DISTRICT_MULTIPLIERS = {
|
|
'Centro': 2.5, # Historic center, high pedestrian activity
|
|
'Salamanca': 2.0, # Shopping area, high foot traffic
|
|
'Chamberí': 1.8, # Business district
|
|
'Retiro': 2.2, # Near park, high leisure activity
|
|
'Chamartín': 1.6, # Business/residential
|
|
'Tetuán': 1.4, # Mixed residential/commercial
|
|
'Fuencarral': 1.3, # Residential with commercial areas
|
|
'Moncloa': 1.7, # University area
|
|
'Latina': 1.5, # Residential area
|
|
'Carabanchel': 1.2, # Residential periphery
|
|
'Usera': 1.1, # Industrial/residential
|
|
'Villaverde': 1.0, # Industrial area
|
|
'Villa de Vallecas': 1.0, # Peripheral residential
|
|
'Vicálvaro': 0.9, # Peripheral
|
|
'San Blas': 1.1, # Residential
|
|
'Barajas': 0.8, # Airport area, low pedestrian activity
|
|
'Hortaleza': 1.2, # Mixed area
|
|
'Ciudad Lineal': 1.3, # Linear development
|
|
'Puente de Vallecas': 1.2, # Working class area
|
|
'Moratalaz': 1.1, # Residential
|
|
'Arganzuela': 1.6, # Near center, growing area
|
|
}
|
|
|
|
# Time-based patterns (hour of day)
|
|
TIME_PATTERNS = {
|
|
'morning_peak': {'hours': [7, 8, 9], 'multiplier': 2.0},
|
|
'lunch_peak': {'hours': [12, 13, 14], 'multiplier': 2.5},
|
|
'evening_peak': {'hours': [18, 19, 20], 'multiplier': 2.2},
|
|
'afternoon': {'hours': [15, 16, 17], 'multiplier': 1.8},
|
|
'late_evening': {'hours': [21, 22], 'multiplier': 1.5},
|
|
'night': {'hours': [23, 0, 1, 2, 3, 4, 5, 6], 'multiplier': 0.3},
|
|
'morning': {'hours': [10, 11], 'multiplier': 1.4}
|
|
}
|
|
|
|
# Road type specific patterns
|
|
ROAD_TYPE_BASE = {
|
|
'URB': 250, # Urban streets - high pedestrian activity
|
|
'M30': 50, # Ring road - minimal pedestrians
|
|
'C30': 75, # Secondary ring - some pedestrian access
|
|
'A': 25, # Highways - very low pedestrians
|
|
'R': 40 # Radial roads - low to moderate
|
|
}
|
|
|
|
# Weather impact on pedestrian activity
|
|
WEATHER_IMPACT = {
|
|
'rain': 0.6, # 40% reduction in rain
|
|
'hot_weather': 0.8, # 20% reduction when very hot
|
|
'cold_weather': 0.7, # 30% reduction when very cold
|
|
'normal': 1.0 # No impact
|
|
}
|
|
|
|
def __init__(self):
|
|
self.logger = structlog.get_logger()
|
|
|
|
def calculate_pedestrian_flow(
|
|
self,
|
|
traffic_record: TrafficRecord,
|
|
location_context: Optional[Dict[str, Any]] = None
|
|
) -> Tuple[int, Dict[str, float]]:
|
|
"""
|
|
Calculate pedestrian flow estimate with detailed metadata
|
|
|
|
Returns:
|
|
Tuple of (pedestrian_count, inference_metadata)
|
|
"""
|
|
# Base calculation from road type
|
|
road_type = traffic_record.road_type or 'URB'
|
|
base_pedestrians = self.ROAD_TYPE_BASE.get(road_type, 200)
|
|
|
|
# Time pattern adjustment
|
|
hour = traffic_record.date.hour
|
|
time_factor = self._get_time_pattern_factor(hour)
|
|
|
|
# District adjustment (if available)
|
|
district_factor = 1.0
|
|
district = traffic_record.district or self.infer_district_from_location(location_context)
|
|
if district:
|
|
district_factor = self.DISTRICT_MULTIPLIERS.get(district, 1.0)
|
|
|
|
# Traffic correlation adjustment
|
|
traffic_factor = self._calculate_traffic_correlation(traffic_record)
|
|
|
|
# Weather adjustment (if data available)
|
|
weather_factor = self._get_weather_factor(traffic_record.date, location_context)
|
|
|
|
# Weekend adjustment
|
|
weekend_factor = self._get_weekend_factor(traffic_record.date)
|
|
|
|
# Combined calculation
|
|
pedestrian_count = int(
|
|
base_pedestrians *
|
|
time_factor *
|
|
district_factor *
|
|
traffic_factor *
|
|
weather_factor *
|
|
weekend_factor
|
|
)
|
|
|
|
# Ensure reasonable bounds
|
|
pedestrian_count = max(10, min(2000, pedestrian_count))
|
|
|
|
# Metadata for model training
|
|
inference_metadata = {
|
|
'base_pedestrians': base_pedestrians,
|
|
'time_factor': time_factor,
|
|
'district_factor': district_factor,
|
|
'traffic_factor': traffic_factor,
|
|
'weather_factor': weather_factor,
|
|
'weekend_factor': weekend_factor,
|
|
'inferred_district': district,
|
|
'hour': hour,
|
|
'road_type': road_type
|
|
}
|
|
|
|
return pedestrian_count, inference_metadata
|
|
|
|
def _get_time_pattern_factor(self, hour: int) -> float:
|
|
"""Get time-based pedestrian activity multiplier"""
|
|
for pattern, config in self.TIME_PATTERNS.items():
|
|
if hour in config['hours']:
|
|
return config['multiplier']
|
|
return 1.0 # Default multiplier
|
|
|
|
def _calculate_traffic_correlation(self, traffic_record: TrafficRecord) -> float:
|
|
"""
|
|
Calculate pedestrian correlation with traffic patterns
|
|
Higher traffic in urban areas often correlates with more pedestrians
|
|
"""
|
|
if traffic_record.road_type == 'URB':
|
|
# Urban areas: moderate traffic indicates commercial activity
|
|
if 30 <= traffic_record.load_percentage <= 70:
|
|
return 1.3 # Sweet spot for pedestrian activity
|
|
elif traffic_record.load_percentage > 70:
|
|
return 0.9 # Too congested, pedestrians avoid
|
|
else:
|
|
return 1.0 # Normal correlation
|
|
else:
|
|
# Highway/ring roads: more traffic = fewer pedestrians
|
|
if traffic_record.load_percentage > 60:
|
|
return 0.5
|
|
else:
|
|
return 0.8
|
|
|
|
def _get_weather_factor(self, date: datetime, location_context: Optional[Dict] = None) -> float:
|
|
"""Estimate weather impact on pedestrian activity"""
|
|
# Simplified weather inference based on season and typical Madrid patterns
|
|
month = date.month
|
|
|
|
# Madrid seasonal patterns
|
|
if month in [12, 1, 2]: # Winter - cold weather impact
|
|
return self.WEATHER_IMPACT['cold_weather']
|
|
elif month in [7, 8]: # Summer - hot weather impact
|
|
return self.WEATHER_IMPACT['hot_weather']
|
|
elif month in [10, 11, 3, 4]: # Rainy seasons - moderate impact
|
|
return 0.85
|
|
else: # Spring/early summer - optimal weather
|
|
return 1.1
|
|
|
|
def _get_weekend_factor(self, date: datetime) -> float:
|
|
"""Weekend vs weekday pedestrian patterns"""
|
|
weekday = date.weekday()
|
|
hour = date.hour
|
|
|
|
if weekday >= 5: # Weekend
|
|
if 11 <= hour <= 16: # Weekend shopping/leisure hours
|
|
return 1.4
|
|
elif 20 <= hour <= 23: # Weekend evening activity
|
|
return 1.3
|
|
else:
|
|
return 0.9
|
|
else: # Weekday
|
|
return 1.0
|
|
|
|
def infer_district_from_location(self, location_context: Optional[Dict] = None) -> Optional[str]:
|
|
"""
|
|
Infer Madrid district from location context or coordinates
|
|
"""
|
|
if not location_context:
|
|
return None
|
|
|
|
lat = location_context.get('latitude')
|
|
lon = location_context.get('longitude')
|
|
|
|
if not (lat and lon):
|
|
return None
|
|
|
|
# Madrid district boundaries (simplified boundaries for inference)
|
|
districts = {
|
|
# Central districts
|
|
'Centro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.720, 'lon_max': -3.690},
|
|
'Arganzuela': {'lat_min': 40.385, 'lat_max': 40.410, 'lon_min': -3.720, 'lon_max': -3.680},
|
|
'Retiro': {'lat_min': 40.405, 'lat_max': 40.425, 'lon_min': -3.690, 'lon_max': -3.660},
|
|
'Salamanca': {'lat_min': 40.420, 'lat_max': 40.445, 'lon_min': -3.690, 'lon_max': -3.660},
|
|
'Chamartín': {'lat_min': 40.445, 'lat_max': 40.480, 'lon_min': -3.690, 'lon_max': -3.660},
|
|
'Tetuán': {'lat_min': 40.445, 'lat_max': 40.470, 'lon_min': -3.720, 'lon_max': -3.690},
|
|
'Chamberí': {'lat_min': 40.425, 'lat_max': 40.450, 'lon_min': -3.720, 'lon_max': -3.690},
|
|
'Fuencarral-El Pardo': {'lat_min': 40.470, 'lat_max': 40.540, 'lon_min': -3.750, 'lon_max': -3.650},
|
|
'Moncloa-Aravaca': {'lat_min': 40.430, 'lat_max': 40.480, 'lon_min': -3.750, 'lon_max': -3.720},
|
|
'Latina': {'lat_min': 40.380, 'lat_max': 40.420, 'lon_min': -3.750, 'lon_max': -3.720},
|
|
'Carabanchel': {'lat_min': 40.350, 'lat_max': 40.390, 'lon_min': -3.750, 'lon_max': -3.720},
|
|
'Usera': {'lat_min': 40.350, 'lat_max': 40.385, 'lon_min': -3.720, 'lon_max': -3.690},
|
|
'Puente de Vallecas': {'lat_min': 40.370, 'lat_max': 40.410, 'lon_min': -3.680, 'lon_max': -3.640},
|
|
'Moratalaz': {'lat_min': 40.400, 'lat_max': 40.430, 'lon_min': -3.650, 'lon_max': -3.620},
|
|
'Ciudad Lineal': {'lat_min': 40.430, 'lat_max': 40.460, 'lon_min': -3.650, 'lon_max': -3.620},
|
|
'Hortaleza': {'lat_min': 40.460, 'lat_max': 40.500, 'lon_min': -3.650, 'lon_max': -3.620},
|
|
'Villaverde': {'lat_min': 40.320, 'lat_max': 40.360, 'lon_min': -3.720, 'lon_max': -3.680},
|
|
}
|
|
|
|
# Find matching district
|
|
for district_name, bounds in districts.items():
|
|
if (bounds['lat_min'] <= lat <= bounds['lat_max'] and
|
|
bounds['lon_min'] <= lon <= bounds['lon_max']):
|
|
return district_name
|
|
|
|
# Default for coordinates in Madrid but not matching specific districts
|
|
if 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5:
|
|
return 'Other Madrid'
|
|
|
|
return None
|
|
|
|
def classify_road_type(self, measurement_point_name: str) -> str:
|
|
"""Classify road type based on measurement point name"""
|
|
if not measurement_point_name:
|
|
return 'URB' # Default to urban
|
|
|
|
name_upper = measurement_point_name.upper()
|
|
|
|
# Highway patterns
|
|
if any(pattern in name_upper for pattern in ['A-', 'AP-', 'AUTOPISTA', 'AUTOVIA']):
|
|
return 'A'
|
|
|
|
# M-30 Ring road
|
|
if 'M-30' in name_upper or 'M30' in name_upper:
|
|
return 'M30'
|
|
|
|
# Other M roads (ring roads)
|
|
if re.search(r'M-[0-9]', name_upper) or re.search(r'M[0-9]', name_upper):
|
|
return 'C30'
|
|
|
|
# Radial roads (R-1, R-2, etc.)
|
|
if re.search(r'R-[0-9]', name_upper) or 'RADIAL' in name_upper:
|
|
return 'R'
|
|
|
|
# Default to urban street
|
|
return 'URB'
|
|
|
|
def validate_madrid_coordinates(self, lat: float, lon: float) -> bool:
|
|
"""Validate coordinates are within Madrid bounds"""
|
|
# Madrid metropolitan area bounds
|
|
return 40.3 <= lat <= 40.6 and -3.8 <= lon <= -3.5
|
|
|
|
def get_congestion_level(self, occupation_pct: float) -> str:
|
|
"""Convert occupation percentage to congestion level"""
|
|
if occupation_pct >= 80:
|
|
return CongestionLevel.BLOCKED.value
|
|
elif occupation_pct >= 50:
|
|
return CongestionLevel.HIGH.value
|
|
elif occupation_pct >= 25:
|
|
return CongestionLevel.MEDIUM.value
|
|
else:
|
|
return CongestionLevel.LOW.value
|
|
|
|
def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Calculate distance between two points in kilometers using Haversine formula"""
|
|
R = 6371 # Earth's radius in kilometers
|
|
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
a = (math.sin(dlat/2) * math.sin(dlat/2) +
|
|
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
|
|
math.sin(dlon/2) * math.sin(dlon/2))
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
|
|
|
return R * c
|
|
|
|
def find_nearest_traffic_point(self, traffic_points: List[Dict[str, Any]],
|
|
latitude: float, longitude: float) -> Optional[Dict[str, Any]]:
|
|
"""Find the nearest traffic point to given coordinates"""
|
|
if not traffic_points:
|
|
return None
|
|
|
|
min_distance = float('inf')
|
|
nearest_point = None
|
|
|
|
for point in traffic_points:
|
|
point_lat = point.get('latitude')
|
|
point_lon = point.get('longitude')
|
|
|
|
if point_lat and point_lon:
|
|
distance = self.calculate_distance(latitude, longitude, point_lat, point_lon)
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
nearest_point = point
|
|
|
|
return nearest_point
|
|
|
|
def find_nearest_measurement_points(self, measurement_points: Dict[str, Dict[str, Any]],
|
|
latitude: float, longitude: float,
|
|
num_points: int = 3, max_distance_km: Optional[float] = 5.0) -> List[Tuple[str, Dict[str, Any], float]]:
|
|
"""Find nearest measurement points for historical data"""
|
|
distances = []
|
|
|
|
for point_id, point_data in measurement_points.items():
|
|
point_lat = point_data.get('latitude')
|
|
point_lon = point_data.get('longitude')
|
|
|
|
if point_lat and point_lon:
|
|
distance_km = self.calculate_distance(latitude, longitude, point_lat, point_lon)
|
|
distances.append((point_id, point_data, distance_km))
|
|
|
|
# Sort by distance and take nearest points
|
|
distances.sort(key=lambda x: x[2])
|
|
|
|
# Apply distance filter if specified
|
|
if max_distance_km is not None:
|
|
distances = [p for p in distances if p[2] <= max_distance_km]
|
|
|
|
nearest = distances[:num_points]
|
|
|
|
self.logger.info("Found nearest measurement points",
|
|
count=len(nearest),
|
|
nearest_distance_km=nearest[0][2] if nearest else None)
|
|
|
|
return nearest |