130 lines
5.8 KiB
Python
130 lines
5.8 KiB
Python
# ================================================================
|
|
# services/data/app/services/traffic_service.py - FIXED VERSION
|
|
# ================================================================
|
|
"""Traffic data service with improved error handling"""
|
|
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, and_
|
|
import structlog
|
|
|
|
from app.models.traffic import TrafficData
|
|
from app.external.madrid_opendata import MadridOpenDataClient
|
|
from app.schemas.external import TrafficDataResponse
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
class TrafficService:
|
|
|
|
def __init__(self):
|
|
self.madrid_client = MadridOpenDataClient()
|
|
|
|
async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[TrafficDataResponse]:
|
|
"""Get current traffic data for location"""
|
|
try:
|
|
logger.debug("Getting current traffic", lat=latitude, lon=longitude)
|
|
traffic_data = await self.madrid_client.get_current_traffic(latitude, longitude)
|
|
|
|
if traffic_data:
|
|
logger.debug("Traffic data received", source=traffic_data.get('source'))
|
|
|
|
# Validate and clean traffic data before creating response
|
|
# Use keyword arguments instead of unpacking
|
|
response = TrafficDataResponse(
|
|
date=traffic_data.get("date", datetime.now()),
|
|
traffic_volume=int(traffic_data.get("traffic_volume", 100)),
|
|
pedestrian_count=int(traffic_data.get("pedestrian_count", 150)),
|
|
congestion_level=str(traffic_data.get("congestion_level", "medium")),
|
|
average_speed=float(traffic_data.get("average_speed", 25.0)), # Fixed: use float, not int
|
|
source=str(traffic_data.get("source", "unknown"))
|
|
)
|
|
|
|
logger.debug("Successfully created traffic response",
|
|
traffic_volume=response.traffic_volume,
|
|
congestion_level=response.congestion_level)
|
|
return response
|
|
else:
|
|
logger.warning("No traffic data received from Madrid client")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get current traffic", error=str(e), lat=latitude, lon=longitude)
|
|
# Log the full traceback for debugging
|
|
import traceback
|
|
logger.error("Traffic service traceback", traceback=traceback.format_exc())
|
|
return None
|
|
|
|
async def get_historical_traffic(self,
|
|
latitude: float,
|
|
longitude: float,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
db: AsyncSession) -> List[TrafficDataResponse]:
|
|
"""Get historical traffic data"""
|
|
try:
|
|
logger.debug("Getting historical traffic",
|
|
lat=latitude, lon=longitude,
|
|
start=start_date, end=end_date)
|
|
|
|
# Check database first
|
|
location_id = f"{latitude:.4f},{longitude:.4f}"
|
|
stmt = select(TrafficData).where(
|
|
and_(
|
|
TrafficData.location_id == location_id,
|
|
TrafficData.date >= start_date,
|
|
TrafficData.date <= end_date
|
|
)
|
|
).order_by(TrafficData.date)
|
|
|
|
result = await db.execute(stmt)
|
|
db_records = result.scalars().all()
|
|
|
|
if db_records:
|
|
logger.debug("Historical traffic data found in database", count=len(db_records))
|
|
return [TrafficDataResponse(
|
|
date=record.date,
|
|
traffic_volume=record.traffic_volume,
|
|
pedestrian_count=record.pedestrian_count,
|
|
congestion_level=record.congestion_level,
|
|
average_speed=record.average_speed,
|
|
source=record.source
|
|
) for record in db_records]
|
|
else:
|
|
logger.debug("No historical traffic data found in database")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get historical traffic", error=str(e))
|
|
return []
|
|
|
|
async def store_traffic_data(self,
|
|
latitude: float,
|
|
longitude: float,
|
|
traffic_data: Dict[str, Any],
|
|
db: AsyncSession) -> bool:
|
|
"""Store traffic data to database"""
|
|
try:
|
|
location_id = f"{latitude:.4f},{longitude:.4f}"
|
|
|
|
traffic_record = TrafficData(
|
|
location_id=location_id,
|
|
date=traffic_data.get("date", datetime.now()),
|
|
traffic_volume=traffic_data.get("traffic_volume"),
|
|
pedestrian_count=traffic_data.get("pedestrian_count"),
|
|
congestion_level=traffic_data.get("congestion_level"),
|
|
average_speed=traffic_data.get("average_speed"),
|
|
source=traffic_data.get("source", "madrid_opendata"),
|
|
raw_data=str(traffic_data) if traffic_data else None
|
|
)
|
|
|
|
db.add(traffic_record)
|
|
await db.commit()
|
|
|
|
logger.debug("Traffic data stored successfully", location_id=location_id)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to store traffic data", error=str(e))
|
|
await db.rollback()
|
|
return False |