Files
bakery-ia/services/external/tests/unit/test_repositories.py
2025-08-12 18:17:30 +02:00

393 lines
14 KiB
Python

# services/external/tests/unit/test_repositories.py
"""
Unit tests for External Service Repositories
"""
import pytest
from datetime import datetime, timezone, timedelta
from uuid import uuid4
from app.repositories.weather_repository import WeatherRepository
from app.repositories.traffic_repository import TrafficRepository
from app.models.weather import WeatherData, WeatherStation, WeatherDataJob
from app.models.traffic import TrafficData, TrafficMeasurementPoint, TrafficDataJob
@pytest.mark.asyncio
class TestWeatherRepository:
"""Test Weather Repository operations"""
async def test_create_weather_data(self, test_db_session, sample_weather_data):
"""Test creating weather data"""
repository = WeatherRepository(test_db_session)
record = await repository.create_weather_data(sample_weather_data)
assert record is not None
assert record.id is not None
assert record.city == sample_weather_data["city"]
assert record.temperature == sample_weather_data["temperature"]
async def test_get_current_weather(self, populated_weather_db, sample_weather_data):
"""Test getting current weather data"""
repository = WeatherRepository(populated_weather_db)
result = await repository.get_current_weather("madrid")
assert result is not None
assert result.city == "madrid"
assert result.temperature == sample_weather_data["temperature"]
async def test_get_weather_forecast(self, test_db_session, sample_weather_forecast):
"""Test getting weather forecast"""
repository = WeatherRepository(test_db_session)
# Create forecast data
for forecast_item in sample_weather_forecast:
await repository.create_weather_data(forecast_item)
result = await repository.get_weather_forecast("madrid", 7)
assert len(result) == 1
assert result[0].is_forecast is True
async def test_get_historical_weather(self, test_db_session, sample_weather_data):
"""Test getting historical weather data"""
repository = WeatherRepository(test_db_session)
# Create historical data
historical_data = sample_weather_data.copy()
historical_data["date"] = datetime.now(timezone.utc) - timedelta(days=1)
await repository.create_weather_data(historical_data)
start_date = datetime.now(timezone.utc) - timedelta(days=2)
end_date = datetime.now(timezone.utc)
result = await repository.get_historical_weather("madrid", start_date, end_date)
assert len(result) >= 1
async def test_create_weather_station(self, test_db_session):
"""Test creating weather station"""
repository = WeatherRepository(test_db_session)
station_data = {
"station_id": "TEST_001",
"name": "Test Station",
"city": "madrid",
"latitude": 40.4168,
"longitude": -3.7038,
"altitude": 650.0,
"is_active": True
}
station = await repository.create_weather_station(station_data)
assert station is not None
assert station.station_id == "TEST_001"
assert station.name == "Test Station"
async def test_get_weather_stations(self, test_db_session):
"""Test getting weather stations"""
repository = WeatherRepository(test_db_session)
# Create test station
station_data = {
"station_id": "TEST_001",
"name": "Test Station",
"city": "madrid",
"latitude": 40.4168,
"longitude": -3.7038,
"is_active": True
}
await repository.create_weather_station(station_data)
stations = await repository.get_weather_stations("madrid")
assert len(stations) == 1
assert stations[0].station_id == "TEST_001"
async def test_create_weather_job(self, test_db_session, sample_tenant_id):
"""Test creating weather data collection job"""
repository = WeatherRepository(test_db_session)
job_data = {
"job_type": "current",
"city": "madrid",
"status": "pending",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
job = await repository.create_weather_job(job_data)
assert job is not None
assert job.job_type == "current"
assert job.status == "pending"
async def test_update_weather_job(self, test_db_session, sample_tenant_id):
"""Test updating weather job"""
repository = WeatherRepository(test_db_session)
# Create job first
job_data = {
"job_type": "current",
"city": "madrid",
"status": "pending",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
job = await repository.create_weather_job(job_data)
# Update job
update_data = {
"status": "completed",
"completed_at": datetime.utcnow(),
"success_count": 1
}
success = await repository.update_weather_job(job.id, update_data)
assert success is True
async def test_get_weather_jobs(self, test_db_session, sample_tenant_id):
"""Test getting weather jobs"""
repository = WeatherRepository(test_db_session)
# Create test job
job_data = {
"job_type": "forecast",
"city": "madrid",
"status": "completed",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
await repository.create_weather_job(job_data)
jobs = await repository.get_weather_jobs()
assert len(jobs) >= 1
assert any(job.job_type == "forecast" for job in jobs)
@pytest.mark.asyncio
class TestTrafficRepository:
"""Test Traffic Repository operations"""
async def test_create_traffic_data(self, test_db_session, sample_traffic_data):
"""Test creating traffic data"""
repository = TrafficRepository(test_db_session)
# Convert sample data to list for bulk create
traffic_list = [sample_traffic_data]
count = await repository.bulk_create_traffic_data(traffic_list)
assert count == 1
async def test_get_current_traffic(self, populated_traffic_db, sample_traffic_data):
"""Test getting current traffic data"""
repository = TrafficRepository(populated_traffic_db)
result = await repository.get_current_traffic("madrid")
assert len(result) >= 1
assert result[0].city == "madrid"
async def test_get_current_traffic_with_filters(self, populated_traffic_db):
"""Test getting current traffic with filters"""
repository = TrafficRepository(populated_traffic_db)
result = await repository.get_current_traffic("madrid", district="Chamartín")
# Should return results based on filter
assert isinstance(result, list)
async def test_get_historical_traffic(self, test_db_session, sample_traffic_data):
"""Test getting historical traffic data"""
repository = TrafficRepository(test_db_session)
# Create historical data
historical_data = sample_traffic_data.copy()
historical_data["date"] = datetime.now(timezone.utc) - timedelta(days=1)
await repository.bulk_create_traffic_data([historical_data])
start_date = datetime.now(timezone.utc) - timedelta(days=2)
end_date = datetime.now(timezone.utc)
result = await repository.get_historical_traffic("madrid", start_date, end_date)
assert len(result) >= 1
async def test_create_measurement_point(self, test_db_session):
"""Test creating traffic measurement point"""
repository = TrafficRepository(test_db_session)
point_data = {
"point_id": "TEST_POINT_001",
"name": "Test Measurement Point",
"city": "madrid",
"point_type": "TEST",
"latitude": 40.4168,
"longitude": -3.7038,
"district": "Centro",
"road_name": "Test Road",
"is_active": True
}
point = await repository.create_measurement_point(point_data)
assert point is not None
assert point.point_id == "TEST_POINT_001"
assert point.name == "Test Measurement Point"
async def test_get_measurement_points(self, test_db_session):
"""Test getting measurement points"""
repository = TrafficRepository(test_db_session)
# Create test point
point_data = {
"point_id": "TEST_POINT_001",
"name": "Test Point",
"city": "madrid",
"point_type": "TEST",
"latitude": 40.4168,
"longitude": -3.7038,
"is_active": True
}
await repository.create_measurement_point(point_data)
points = await repository.get_measurement_points("madrid")
assert len(points) == 1
assert points[0].point_id == "TEST_POINT_001"
async def test_get_measurement_points_with_filters(self, test_db_session):
"""Test getting measurement points with filters"""
repository = TrafficRepository(test_db_session)
# Create test points with different types
for i, point_type in enumerate(["M30", "URB", "TEST"]):
point_data = {
"point_id": f"TEST_POINT_{i:03d}",
"name": f"Test Point {i}",
"city": "madrid",
"point_type": point_type,
"latitude": 40.4168,
"longitude": -3.7038,
"is_active": True
}
await repository.create_measurement_point(point_data)
# Filter by type
points = await repository.get_measurement_points("madrid", road_type="M30")
assert len(points) == 1
assert points[0].point_type == "M30"
async def test_get_traffic_analytics(self, populated_traffic_db):
"""Test getting traffic analytics"""
repository = TrafficRepository(populated_traffic_db)
analytics = await repository.get_traffic_analytics("madrid")
assert isinstance(analytics, dict)
assert "total_measurements" in analytics
assert "average_volume" in analytics
async def test_create_traffic_job(self, test_db_session, sample_tenant_id):
"""Test creating traffic collection job"""
repository = TrafficRepository(test_db_session)
job_data = {
"job_type": "current",
"city": "madrid",
"status": "pending",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
job = await repository.create_traffic_job(job_data)
assert job is not None
assert job.job_type == "current"
assert job.status == "pending"
async def test_update_traffic_job(self, test_db_session, sample_tenant_id):
"""Test updating traffic job"""
repository = TrafficRepository(test_db_session)
# Create job first
job_data = {
"job_type": "current",
"city": "madrid",
"status": "pending",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
job = await repository.create_traffic_job(job_data)
# Update job
update_data = {
"status": "completed",
"completed_at": datetime.utcnow(),
"success_count": 10
}
success = await repository.update_traffic_job(job.id, update_data)
assert success is True
async def test_get_traffic_jobs(self, test_db_session, sample_tenant_id):
"""Test getting traffic jobs"""
repository = TrafficRepository(test_db_session)
# Create test job
job_data = {
"job_type": "historical",
"city": "madrid",
"status": "completed",
"scheduled_at": datetime.utcnow(),
"tenant_id": sample_tenant_id
}
await repository.create_traffic_job(job_data)
jobs = await repository.get_traffic_jobs()
assert len(jobs) >= 1
assert any(job.job_type == "historical" for job in jobs)
async def test_bulk_create_performance(self, test_db_session):
"""Test bulk create performance"""
repository = TrafficRepository(test_db_session)
# Create large dataset
bulk_data = []
for i in range(100):
data = {
"city": "madrid",
"location_id": f"PM_TEST_{i:03d}",
"date": datetime.now(timezone.utc),
"measurement_point_id": f"PM_TEST_{i:03d}",
"measurement_point_name": f"Test Point {i}",
"measurement_point_type": "TEST",
"traffic_volume": 100 + i,
"average_speed": 50.0,
"congestion_level": "medium",
"occupation_percentage": 50.0,
"latitude": 40.4168,
"longitude": -3.7038,
"source": "test"
}
bulk_data.append(data)
import time
start_time = time.time()
count = await repository.bulk_create_traffic_data(bulk_data)
end_time = time.time()
execution_time = end_time - start_time
assert count == 100
assert execution_time < 3.0 # Should complete in under 3 seconds