Files
bakery-ia/services/external/app/models/traffic.py

295 lines
13 KiB
Python
Raw Normal View History

2025-07-18 11:51:43 +02:00
# ================================================================
2025-08-10 17:31:38 +02:00
# services/data/app/models/traffic.py - Enhanced for Multiple Cities
2025-07-18 11:51:43 +02:00
# ================================================================
2025-08-10 17:31:38 +02:00
"""
Flexible traffic data models supporting multiple cities and extensible schemas
"""
2025-07-18 11:51:43 +02:00
2025-08-10 17:31:38 +02:00
from sqlalchemy import Column, String, DateTime, Float, Integer, Text, Index, Boolean, JSON
2025-07-18 11:51:43 +02:00
from sqlalchemy.dialects.postgresql import UUID
import uuid
2025-08-08 09:08:41 +02:00
from datetime import datetime, timezone
2025-08-10 17:31:38 +02:00
from typing import Dict, Any, Optional
2025-07-18 11:51:43 +02:00
2025-08-08 09:08:41 +02:00
from shared.database.base import Base
2025-07-18 11:51:43 +02:00
2025-08-10 17:31:38 +02:00
2025-07-18 11:51:43 +02:00
class TrafficData(Base):
2025-08-10 17:31:38 +02:00
"""
Flexible traffic data model supporting multiple cities
Designed to accommodate varying data structures across different cities
"""
2025-07-18 11:51:43 +02:00
__tablename__ = "traffic_data"
2025-08-10 17:31:38 +02:00
# Primary identification
2025-07-18 11:51:43 +02:00
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
2025-08-10 17:31:38 +02:00
# Location and temporal data
location_id = Column(String(100), nullable=False, index=True) # "lat,lon" or city-specific ID
city = Column(String(50), nullable=False, index=True) # madrid, barcelona, valencia, etc.
2025-07-27 21:44:18 +02:00
date = Column(DateTime(timezone=True), nullable=False, index=True)
2025-08-10 17:31:38 +02:00
# Core standardized traffic metrics (common across all cities)
traffic_volume = Column(Integer, nullable=True) # Vehicle count or intensity
congestion_level = Column(String(20), nullable=True) # low, medium, high, blocked
average_speed = Column(Float, nullable=True) # Average speed in km/h
# Enhanced metrics (may not be available for all cities)
occupation_percentage = Column(Float, nullable=True) # Road occupation %
load_percentage = Column(Float, nullable=True) # Traffic load %
pedestrian_count = Column(Integer, nullable=True) # Estimated pedestrian count
# Measurement point information
measurement_point_id = Column(String(100), nullable=True, index=True)
measurement_point_name = Column(String(500), nullable=True)
measurement_point_type = Column(String(50), nullable=True) # URB, M30, A, etc.
# Geographic data
latitude = Column(Float, nullable=True)
longitude = Column(Float, nullable=True)
district = Column(String(100), nullable=True) # City district/area
zone = Column(String(100), nullable=True) # Traffic zone or sector
# Data source and quality
source = Column(String(50), nullable=False, default="unknown") # madrid_opendata, synthetic, etc.
data_quality_score = Column(Float, nullable=True) # Quality score 0-100
is_synthetic = Column(Boolean, default=False)
has_pedestrian_inference = Column(Boolean, default=False)
# City-specific data (flexible JSON storage)
city_specific_data = Column(JSON, nullable=True) # Store city-specific fields
# Raw data backup
raw_data = Column(Text, nullable=True) # Original data for debugging
# Audit fields
tenant_id = Column(UUID(as_uuid=True), nullable=True, index=True) # For multi-tenancy
2025-08-08 09:08:41 +02:00
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
2025-08-10 17:31:38 +02:00
updated_at = Column(DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
2025-07-18 11:51:43 +02:00
2025-08-10 17:31:38 +02:00
# Performance-optimized indexes
2025-07-18 11:51:43 +02:00
__table_args__ = (
2025-08-10 17:31:38 +02:00
# Core query patterns
2025-07-18 11:51:43 +02:00
Index('idx_traffic_location_date', 'location_id', 'date'),
2025-08-10 17:31:38 +02:00
Index('idx_traffic_city_date', 'city', 'date'),
Index('idx_traffic_tenant_date', 'tenant_id', 'date'),
# Advanced query patterns
Index('idx_traffic_city_location', 'city', 'location_id'),
Index('idx_traffic_measurement_point', 'city', 'measurement_point_id'),
Index('idx_traffic_district_date', 'city', 'district', 'date'),
# Training data queries
Index('idx_traffic_training', 'tenant_id', 'city', 'date', 'is_synthetic'),
Index('idx_traffic_quality', 'city', 'data_quality_score', 'date'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary for API responses"""
result = {
'id': str(self.id),
'location_id': self.location_id,
'city': self.city,
'date': self.date.isoformat() if self.date else None,
'traffic_volume': self.traffic_volume,
'congestion_level': self.congestion_level,
'average_speed': self.average_speed,
'occupation_percentage': self.occupation_percentage,
'load_percentage': self.load_percentage,
'pedestrian_count': self.pedestrian_count,
'measurement_point_id': self.measurement_point_id,
'measurement_point_name': self.measurement_point_name,
'measurement_point_type': self.measurement_point_type,
'latitude': self.latitude,
'longitude': self.longitude,
'district': self.district,
'zone': self.zone,
'source': self.source,
'data_quality_score': self.data_quality_score,
'is_synthetic': self.is_synthetic,
'has_pedestrian_inference': self.has_pedestrian_inference,
'created_at': self.created_at.isoformat() if self.created_at else None
}
# Add city-specific data if present
if self.city_specific_data:
result['city_specific_data'] = self.city_specific_data
return result
def get_city_specific_field(self, field_name: str, default: Any = None) -> Any:
"""Safely get city-specific field value"""
if self.city_specific_data and isinstance(self.city_specific_data, dict):
return self.city_specific_data.get(field_name, default)
return default
def set_city_specific_field(self, field_name: str, value: Any) -> None:
"""Set city-specific field value"""
if not self.city_specific_data:
self.city_specific_data = {}
if not isinstance(self.city_specific_data, dict):
self.city_specific_data = {}
self.city_specific_data[field_name] = value
class TrafficMeasurementPoint(Base):
"""
Registry of traffic measurement points across all cities
Supports different city-specific measurement point schemas
"""
__tablename__ = "traffic_measurement_points"
# Primary identification
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# Location and identification
city = Column(String(50), nullable=False, index=True)
measurement_point_id = Column(String(100), nullable=False, index=True) # City-specific ID
name = Column(String(500), nullable=True)
description = Column(Text, nullable=True)
# Geographic information
latitude = Column(Float, nullable=False)
longitude = Column(Float, nullable=False)
district = Column(String(100), nullable=True)
zone = Column(String(100), nullable=True)
# Classification
road_type = Column(String(50), nullable=True) # URB, M30, A, etc.
measurement_type = Column(String(50), nullable=True) # intensity, speed, etc.
point_category = Column(String(50), nullable=True) # urban, highway, ring_road
# Status and metadata
is_active = Column(Boolean, default=True)
installation_date = Column(DateTime(timezone=True), nullable=True)
last_data_received = Column(DateTime(timezone=True), nullable=True)
data_quality_rating = Column(Float, nullable=True) # Average quality 0-100
# City-specific point data
city_specific_metadata = Column(JSON, nullable=True)
# Audit fields
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
__table_args__ = (
# Ensure unique measurement points per city
Index('idx_unique_city_point', 'city', 'measurement_point_id', unique=True),
# Geographic queries
Index('idx_points_city_location', 'city', 'latitude', 'longitude'),
Index('idx_points_district', 'city', 'district'),
Index('idx_points_road_type', 'city', 'road_type'),
# Status queries
Index('idx_points_active', 'city', 'is_active', 'last_data_received'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert measurement point to dictionary"""
return {
'id': str(self.id),
'city': self.city,
'measurement_point_id': self.measurement_point_id,
'name': self.name,
'description': self.description,
'latitude': self.latitude,
'longitude': self.longitude,
'district': self.district,
'zone': self.zone,
'road_type': self.road_type,
'measurement_type': self.measurement_type,
'point_category': self.point_category,
'is_active': self.is_active,
'installation_date': self.installation_date.isoformat() if self.installation_date else None,
'last_data_received': self.last_data_received.isoformat() if self.last_data_received else None,
'data_quality_rating': self.data_quality_rating,
'city_specific_metadata': self.city_specific_metadata,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class TrafficDataBackgroundJob(Base):
"""
Track background data collection jobs for multiple cities
Supports scheduling and monitoring of data fetching processes
"""
__tablename__ = "traffic_background_jobs"
# Primary identification
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# Job configuration
job_type = Column(String(50), nullable=False) # historical_fetch, cleanup, etc.
city = Column(String(50), nullable=False, index=True)
location_pattern = Column(String(200), nullable=True) # Location pattern or specific coords
# Scheduling
scheduled_at = Column(DateTime(timezone=True), nullable=False)
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Status tracking
status = Column(String(20), nullable=False, default='pending') # pending, running, completed, failed
progress_percentage = Column(Float, default=0.0)
records_processed = Column(Integer, default=0)
records_stored = Column(Integer, default=0)
# Date range for data jobs
data_start_date = Column(DateTime(timezone=True), nullable=True)
data_end_date = Column(DateTime(timezone=True), nullable=True)
# Results and error handling
success_count = Column(Integer, default=0)
error_count = Column(Integer, default=0)
error_message = Column(Text, nullable=True)
job_metadata = Column(JSON, nullable=True) # Additional job-specific data
# Tenant association
tenant_id = Column(UUID(as_uuid=True), nullable=True, index=True)
# Audit fields
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
__table_args__ = (
# Job monitoring
Index('idx_jobs_city_status', 'city', 'status', 'scheduled_at'),
Index('idx_jobs_tenant_status', 'tenant_id', 'status', 'scheduled_at'),
Index('idx_jobs_type_city', 'job_type', 'city', 'scheduled_at'),
# Cleanup queries
Index('idx_jobs_completed', 'status', 'completed_at'),
2025-07-18 11:51:43 +02:00
)
2025-08-10 17:31:38 +02:00
def to_dict(self) -> Dict[str, Any]:
"""Convert job to dictionary"""
return {
'id': str(self.id),
'job_type': self.job_type,
'city': self.city,
'location_pattern': self.location_pattern,
'scheduled_at': self.scheduled_at.isoformat() if self.scheduled_at else None,
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'status': self.status,
'progress_percentage': self.progress_percentage,
'records_processed': self.records_processed,
'records_stored': self.records_stored,
'data_start_date': self.data_start_date.isoformat() if self.data_start_date else None,
'data_end_date': self.data_end_date.isoformat() if self.data_end_date else None,
'success_count': self.success_count,
'error_count': self.error_count,
'error_message': self.error_message,
'job_metadata': self.job_metadata,
'tenant_id': str(self.tenant_id) if self.tenant_id else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}