411 lines
17 KiB
Python
411 lines
17 KiB
Python
# ================================================================
|
|
# services/data/app/services/traffic_service.py
|
|
# ================================================================
|
|
"""
|
|
Abstracted Traffic Service - Universal interface for traffic data across multiple cities
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
import structlog
|
|
|
|
from app.external.apis.traffic import UniversalTrafficClient
|
|
from app.models.traffic import TrafficData
|
|
from app.repositories.traffic_repository import TrafficRepository
|
|
|
|
logger = structlog.get_logger()
|
|
from app.core.database import database_manager
|
|
|
|
class TrafficService:
|
|
"""
|
|
Abstracted traffic service providing unified interface for traffic data
|
|
Routes requests to appropriate city-specific clients automatically
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.universal_client = UniversalTrafficClient()
|
|
self.database_manager = database_manager
|
|
|
|
async def get_current_traffic(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
tenant_id: Optional[str] = None,
|
|
force_refresh: bool = False,
|
|
cache_duration_minutes: int = 5
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get current traffic data with intelligent cache-first strategy
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
tenant_id: Optional tenant identifier for logging/analytics
|
|
force_refresh: If True, bypass cache and fetch fresh data
|
|
cache_duration_minutes: How long to consider cached data valid (default: 5 minutes)
|
|
|
|
Returns:
|
|
Dict with current traffic data or None if not available
|
|
"""
|
|
try:
|
|
logger.info("Getting current traffic data",
|
|
lat=latitude, lon=longitude, tenant_id=tenant_id,
|
|
force_refresh=force_refresh, cache_duration=cache_duration_minutes)
|
|
|
|
location_id = f"{latitude:.4f},{longitude:.4f}"
|
|
|
|
# Step 1: Check database cache first (unless force_refresh)
|
|
if not force_refresh:
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
# Get recent traffic data (within cache_duration_minutes)
|
|
from datetime import timedelta
|
|
cache_cutoff = datetime.now() - timedelta(minutes=cache_duration_minutes)
|
|
|
|
cached_records = await traffic_repo.get_recent_by_location(
|
|
latitude, longitude, cache_cutoff, tenant_id
|
|
)
|
|
|
|
if cached_records:
|
|
logger.info("Current traffic data found in cache",
|
|
count=len(cached_records), cache_age_minutes=cache_duration_minutes)
|
|
# Return the most recent cached record
|
|
latest_record = max(cached_records, key=lambda x: x.date)
|
|
cached_data = self._convert_db_record_to_dict(latest_record)
|
|
|
|
# Add cache metadata
|
|
cached_data['service_metadata'] = {
|
|
'request_timestamp': datetime.now().isoformat(),
|
|
'tenant_id': tenant_id,
|
|
'service_version': '2.0',
|
|
'query_location': {'latitude': latitude, 'longitude': longitude},
|
|
'data_source': 'cache',
|
|
'cache_age_minutes': (datetime.now() - latest_record.date).total_seconds() / 60
|
|
}
|
|
|
|
return cached_data
|
|
|
|
# Step 2: Fetch fresh data from external API
|
|
logger.info("Fetching fresh current traffic data" +
|
|
(" (force refresh)" if force_refresh else " (no valid cache)"))
|
|
|
|
traffic_data = await self.universal_client.get_current_traffic(latitude, longitude)
|
|
|
|
if traffic_data:
|
|
# Add service metadata
|
|
traffic_data['service_metadata'] = {
|
|
'request_timestamp': datetime.now().isoformat(),
|
|
'tenant_id': tenant_id,
|
|
'service_version': '2.0',
|
|
'query_location': {'latitude': latitude, 'longitude': longitude},
|
|
'data_source': 'fresh_api'
|
|
}
|
|
|
|
# Step 3: Store fresh data in cache for future requests
|
|
try:
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
# Store the fresh data as a single record
|
|
stored_count = await traffic_repo.store_traffic_data_batch(
|
|
[traffic_data], location_id, tenant_id
|
|
)
|
|
logger.info("Stored fresh current traffic data in cache",
|
|
stored_records=stored_count)
|
|
except Exception as cache_error:
|
|
logger.warning("Failed to cache current traffic data", error=str(cache_error))
|
|
|
|
logger.info("Successfully retrieved fresh current traffic data",
|
|
lat=latitude, lon=longitude,
|
|
source=traffic_data.get('source', 'unknown'))
|
|
|
|
return traffic_data
|
|
else:
|
|
logger.warning("No current traffic data available",
|
|
lat=latitude, lon=longitude)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting current traffic data",
|
|
lat=latitude, lon=longitude, error=str(e))
|
|
return None
|
|
|
|
async def get_historical_traffic(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
tenant_id: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get historical traffic data for any supported location with database storage
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
start_date: Start date for historical data
|
|
end_date: End date for historical data
|
|
tenant_id: Optional tenant identifier
|
|
|
|
Returns:
|
|
List of historical traffic data dictionaries
|
|
"""
|
|
try:
|
|
logger.info("Getting historical traffic data",
|
|
lat=latitude, lon=longitude,
|
|
start=start_date, end=end_date, tenant_id=tenant_id)
|
|
|
|
# Validate date range
|
|
if start_date >= end_date:
|
|
logger.warning("Invalid date range", start=start_date, end=end_date)
|
|
return []
|
|
|
|
location_id = f"{latitude:.4f},{longitude:.4f}"
|
|
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
# Check database first using the repository
|
|
db_records = await traffic_repo.get_by_location_and_date_range(
|
|
latitude, longitude, start_date, end_date, tenant_id
|
|
)
|
|
|
|
if db_records:
|
|
logger.info("Historical traffic data found in database",
|
|
count=len(db_records))
|
|
return [self._convert_db_record_to_dict(record) for record in db_records]
|
|
|
|
# Delegate to universal client if not in DB
|
|
traffic_data = await self.universal_client.get_historical_traffic(
|
|
latitude, longitude, start_date, end_date
|
|
)
|
|
|
|
if traffic_data:
|
|
# Add service metadata to each record
|
|
for record in traffic_data:
|
|
record['service_metadata'] = {
|
|
'request_timestamp': datetime.now().isoformat(),
|
|
'tenant_id': tenant_id,
|
|
'service_version': '2.0',
|
|
'query_location': {'latitude': latitude, 'longitude': longitude},
|
|
'date_range': {
|
|
'start': start_date.isoformat(),
|
|
'end': end_date.isoformat()
|
|
}
|
|
}
|
|
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
# Store in database using the repository
|
|
stored_count = await traffic_repo.store_traffic_data_batch(
|
|
traffic_data, location_id, tenant_id
|
|
)
|
|
logger.info("Traffic data stored for re-training",
|
|
fetched=len(traffic_data), stored=stored_count,
|
|
location=location_id)
|
|
|
|
logger.info("Successfully retrieved historical traffic data",
|
|
lat=latitude, lon=longitude, records=len(traffic_data))
|
|
|
|
return traffic_data
|
|
else:
|
|
logger.info("No historical traffic data available",
|
|
lat=latitude, lon=longitude)
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting historical traffic data",
|
|
lat=latitude, lon=longitude, error=str(e))
|
|
return []
|
|
|
|
def _convert_db_record_to_dict(self, record: TrafficData) -> Dict[str, Any]:
|
|
"""Convert database record to dictionary format"""
|
|
return {
|
|
'date': record.date,
|
|
'traffic_volume': record.traffic_volume,
|
|
'pedestrian_count': record.pedestrian_count,
|
|
'congestion_level': record.congestion_level,
|
|
'average_speed': record.average_speed,
|
|
'source': record.source,
|
|
'location_id': record.location_id,
|
|
'raw_data': record.raw_data
|
|
}
|
|
|
|
async def get_traffic_events(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
radius_km: float = 5.0,
|
|
tenant_id: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get traffic events and incidents for any supported location
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
radius_km: Search radius in kilometers
|
|
tenant_id: Optional tenant identifier
|
|
|
|
Returns:
|
|
List of traffic events
|
|
"""
|
|
try:
|
|
logger.info("Getting traffic events",
|
|
lat=latitude, lon=longitude, radius=radius_km, tenant_id=tenant_id)
|
|
|
|
# Delegate to universal client
|
|
events = await self.universal_client.get_events(latitude, longitude, radius_km)
|
|
|
|
# Add metadata to events
|
|
for event in events:
|
|
event['service_metadata'] = {
|
|
'request_timestamp': datetime.now().isoformat(),
|
|
'tenant_id': tenant_id,
|
|
'service_version': '2.0',
|
|
'query_location': {'latitude': latitude, 'longitude': longitude},
|
|
'search_radius_km': radius_km
|
|
}
|
|
|
|
logger.info("Retrieved traffic events",
|
|
lat=latitude, lon=longitude, events=len(events))
|
|
|
|
return events
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting traffic events",
|
|
lat=latitude, lon=longitude, error=str(e))
|
|
return []
|
|
|
|
def get_location_info(self, latitude: float, longitude: float) -> Dict[str, Any]:
|
|
"""
|
|
Get information about traffic data availability for location
|
|
|
|
Args:
|
|
latitude: Query location latitude
|
|
longitude: Query location longitude
|
|
|
|
Returns:
|
|
Dict with location support information
|
|
"""
|
|
try:
|
|
info = self.universal_client.get_location_info(latitude, longitude)
|
|
|
|
# Add service layer information
|
|
info['service_layer'] = {
|
|
'version': '2.0',
|
|
'abstraction_level': 'universal',
|
|
'supported_operations': [
|
|
'current_traffic',
|
|
'historical_traffic',
|
|
'traffic_events',
|
|
'bulk_requests'
|
|
]
|
|
}
|
|
|
|
return info
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting location info",
|
|
lat=latitude, lon=longitude, error=str(e))
|
|
return {
|
|
'supported': False,
|
|
'error': str(e),
|
|
'service_layer': {'version': '2.0'}
|
|
}
|
|
|
|
async def get_stored_traffic_for_training(self,
|
|
latitude: float,
|
|
longitude: float,
|
|
start_date: datetime,
|
|
end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""Retrieve stored traffic data specifically for training purposes"""
|
|
try:
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
records = await traffic_repo.get_historical_traffic_for_training(
|
|
latitude, longitude, start_date, end_date
|
|
)
|
|
|
|
# Convert to training format
|
|
training_data = []
|
|
for record in records:
|
|
training_data.append({
|
|
'date': record.date,
|
|
'traffic_volume': record.traffic_volume,
|
|
'pedestrian_count': record.pedestrian_count,
|
|
'congestion_level': record.congestion_level,
|
|
'average_speed': record.average_speed,
|
|
'location_id': record.location_id,
|
|
'source': record.source,
|
|
'measurement_point_id': record.raw_data # Contains additional metadata
|
|
})
|
|
|
|
logger.info(f"Retrieved {len(training_data)} traffic records for training",
|
|
location_id=f"{latitude:.4f},{longitude:.4f}", start=start_date, end=end_date)
|
|
|
|
return training_data
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to retrieve traffic data for training",
|
|
error=str(e), location_id=f"{latitude:.4f},{longitude:.4f}")
|
|
return []
|
|
|
|
# ============= UNIFIED CONVENIENCE METHODS =============
|
|
|
|
async def get_current_traffic_fresh(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
tenant_id: Optional[str] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Get current traffic data, forcing fresh API call (bypass cache)"""
|
|
return await self.get_current_traffic(
|
|
latitude=latitude,
|
|
longitude=longitude,
|
|
tenant_id=tenant_id,
|
|
force_refresh=True
|
|
)
|
|
|
|
async def get_historical_traffic_fresh(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
tenant_id: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get historical traffic data, forcing fresh API call (bypass cache)"""
|
|
# For historical data, we can implement force_refresh logic
|
|
# For now, historical already has good cache-first logic
|
|
return await self.get_historical_traffic(
|
|
latitude=latitude,
|
|
longitude=longitude,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
async def clear_traffic_cache(
|
|
self,
|
|
latitude: float,
|
|
longitude: float,
|
|
tenant_id: Optional[str] = None
|
|
) -> bool:
|
|
"""Clear cached traffic data for a specific location"""
|
|
try:
|
|
location_id = f"{latitude:.4f},{longitude:.4f}"
|
|
|
|
async with self.database_manager.get_session() as session:
|
|
traffic_repo = TrafficRepository(session)
|
|
# This would need a new repository method to delete by location
|
|
# For now, just log the intent
|
|
logger.info("Traffic cache clear requested",
|
|
location_id=location_id, tenant_id=tenant_id)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Error clearing traffic cache",
|
|
lat=latitude, lon=longitude, error=str(e))
|
|
return False |