REFACTOR AEMET weather file
This commit is contained in:
594
services/data/tests/test_aemet_edge_cases.py
Normal file
594
services/data/tests/test_aemet_edge_cases.py
Normal file
@@ -0,0 +1,594 @@
|
||||
# ================================================================
|
||||
# services/data/tests/test_aemet_edge_cases.py
|
||||
# ================================================================
|
||||
"""
|
||||
Edge cases and integration tests for AEMET weather API client
|
||||
Covers boundary conditions, error scenarios, and complex integrations
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock, patch, AsyncMock
|
||||
import json
|
||||
from typing import Dict, List, Any
|
||||
|
||||
from app.external.aemet import (
|
||||
AEMETClient,
|
||||
WeatherDataParser,
|
||||
SyntheticWeatherGenerator,
|
||||
LocationService,
|
||||
AEMETConstants,
|
||||
WeatherSource
|
||||
)
|
||||
|
||||
# Configure pytest-asyncio
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class TestAEMETEdgeCases:
|
||||
"""Test edge cases and boundary conditions"""
|
||||
|
||||
async def test_extreme_coordinates(self, aemet_client):
|
||||
"""Test handling of extreme coordinate values"""
|
||||
extreme_coords = [
|
||||
(90, 180), # North pole, antimeridian
|
||||
(-90, -180), # South pole, antimeridian
|
||||
(0, 0), # Null island
|
||||
(40.5, -180), # Valid latitude, extreme longitude
|
||||
(90, -3.7), # Extreme latitude, Madrid longitude
|
||||
]
|
||||
|
||||
for lat, lon in extreme_coords:
|
||||
result = await aemet_client.get_current_weather(lat, lon)
|
||||
|
||||
assert result is not None, f"Should handle extreme coords ({lat}, {lon})"
|
||||
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic for extreme coords"
|
||||
assert isinstance(result['temperature'], (int, float)), "Should have valid temperature"
|
||||
|
||||
async def test_boundary_date_ranges(self, aemet_client, madrid_coords):
|
||||
"""Test boundary conditions for date ranges"""
|
||||
lat, lon = madrid_coords
|
||||
now = datetime.now()
|
||||
|
||||
# Test same start and end date
|
||||
result = await aemet_client.get_historical_weather(lat, lon, now, now)
|
||||
assert isinstance(result, list), "Should return list for same-day request"
|
||||
|
||||
# Test reverse date range (end before start)
|
||||
start_date = now
|
||||
end_date = now - timedelta(days=1)
|
||||
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
|
||||
assert isinstance(result, list), "Should handle reverse date range gracefully"
|
||||
|
||||
# Test extremely large date range
|
||||
start_date = now - timedelta(days=1000)
|
||||
end_date = now
|
||||
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
|
||||
assert isinstance(result, list), "Should handle very large date ranges"
|
||||
|
||||
async def test_forecast_edge_durations(self, aemet_client, madrid_coords):
|
||||
"""Test forecast with edge case durations"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
edge_durations = [0, 1, 30, 365, -1, 1000]
|
||||
|
||||
for days in edge_durations:
|
||||
try:
|
||||
result = await aemet_client.get_forecast(lat, lon, days)
|
||||
|
||||
if days <= 0:
|
||||
assert len(result) == 0 or result is None, f"Should handle non-positive days ({days})"
|
||||
elif days > 100:
|
||||
# Should handle gracefully, possibly with synthetic data
|
||||
assert isinstance(result, list), f"Should handle large day count ({days})"
|
||||
else:
|
||||
assert len(result) == days, f"Should return {days} forecast days"
|
||||
|
||||
except Exception as e:
|
||||
# Some edge cases might raise exceptions, which is acceptable
|
||||
print(f"ℹ️ Days={days} raised exception: {e}")
|
||||
|
||||
def test_parser_edge_cases(self, weather_parser):
|
||||
"""Test weather data parser with edge case inputs"""
|
||||
# Test with None values
|
||||
result = weather_parser.safe_float(None, 10.0)
|
||||
assert result == 10.0, "Should return default for None"
|
||||
|
||||
# Test with empty strings
|
||||
result = weather_parser.safe_float("", 5.0)
|
||||
assert result == 5.0, "Should return default for empty string"
|
||||
|
||||
# Test with extreme values
|
||||
result = weather_parser.safe_float("999999.99", 0.0)
|
||||
assert result == 999999.99, "Should handle large numbers"
|
||||
|
||||
result = weather_parser.safe_float("-999.99", 0.0)
|
||||
assert result == -999.99, "Should handle negative numbers"
|
||||
|
||||
# Test temperature extraction edge cases
|
||||
assert weather_parser.extract_temperature_value([]) is None, "Should handle empty list"
|
||||
assert weather_parser.extract_temperature_value({}) is None, "Should handle empty dict"
|
||||
assert weather_parser.extract_temperature_value("invalid") is None, "Should handle invalid string"
|
||||
|
||||
def test_synthetic_generator_edge_cases(self, synthetic_generator):
|
||||
"""Test synthetic weather generator edge cases"""
|
||||
# Test with extreme date ranges
|
||||
end_date = datetime.now()
|
||||
start_date = end_date - timedelta(days=1000)
|
||||
|
||||
result = synthetic_generator.generate_historical_data(start_date, end_date)
|
||||
assert isinstance(result, list), "Should handle large date ranges"
|
||||
assert len(result) == 1001, "Should generate correct number of days"
|
||||
|
||||
# Test forecast with zero days
|
||||
result = synthetic_generator.generate_forecast_sync(0)
|
||||
assert result == [], "Should return empty list for zero days"
|
||||
|
||||
# Test forecast with large number of days
|
||||
result = synthetic_generator.generate_forecast_sync(1000)
|
||||
assert len(result) == 1000, "Should handle large forecast ranges"
|
||||
|
||||
def test_location_service_edge_cases(self):
|
||||
"""Test location service edge cases"""
|
||||
# Test distance calculation with same points
|
||||
distance = LocationService.calculate_distance(40.4, -3.7, 40.4, -3.7)
|
||||
assert distance == 0.0, "Distance between same points should be zero"
|
||||
|
||||
# Test distance calculation with antipodal points
|
||||
distance = LocationService.calculate_distance(40.4, -3.7, -40.4, 176.3)
|
||||
assert distance > 15000, "Antipodal points should be far apart"
|
||||
|
||||
# Test station finding with no stations (if list were empty)
|
||||
with patch.object(AEMETConstants, 'MADRID_STATIONS', []):
|
||||
station = LocationService.find_nearest_station(40.4, -3.7)
|
||||
assert station is None, "Should return None when no stations available"
|
||||
|
||||
|
||||
class TestAEMETDataIntegrity:
|
||||
"""Test data integrity and consistency"""
|
||||
|
||||
async def test_data_type_consistency(self, aemet_client, madrid_coords):
|
||||
"""Test that data types are consistent across calls"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get current weather multiple times
|
||||
results = []
|
||||
for _ in range(3):
|
||||
result = await aemet_client.get_current_weather(lat, lon)
|
||||
results.append(result)
|
||||
|
||||
# Check that field types are consistent
|
||||
if all(r is not None for r in results):
|
||||
for field in ['temperature', 'precipitation', 'humidity', 'wind_speed', 'pressure']:
|
||||
types = [type(r[field]) for r in results if field in r]
|
||||
if types:
|
||||
first_type = types[0]
|
||||
assert all(t == first_type for t in types), f"Inconsistent types for {field}: {types}"
|
||||
|
||||
async def test_temperature_consistency(self, aemet_client, madrid_coords):
|
||||
"""Test temperature consistency between different data sources"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get current weather and today's forecast
|
||||
current = await aemet_client.get_current_weather(lat, lon)
|
||||
forecast = await aemet_client.get_forecast(lat, lon, 1)
|
||||
|
||||
if current and forecast and len(forecast) > 0:
|
||||
current_temp = current['temperature']
|
||||
forecast_temp = forecast[0]['temperature']
|
||||
|
||||
# Temperatures should be reasonably close (within 15°C)
|
||||
temp_diff = abs(current_temp - forecast_temp)
|
||||
assert temp_diff < 15, f"Temperature difference too large: current={current_temp}°C, forecast={forecast_temp}°C"
|
||||
|
||||
async def test_source_consistency(self, aemet_client, madrid_coords):
|
||||
"""Test that data source is consistent within same time period"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get multiple current weather readings
|
||||
current1 = await aemet_client.get_current_weather(lat, lon)
|
||||
current2 = await aemet_client.get_current_weather(lat, lon)
|
||||
|
||||
if current1 and current2:
|
||||
# Should use same source type (both real or both synthetic)
|
||||
assert current1['source'] == current2['source'], "Should use consistent data source"
|
||||
|
||||
def test_historical_data_ordering(self, weather_parser, mock_historical_data):
|
||||
"""Test that historical data is properly ordered"""
|
||||
parsed_data = weather_parser.parse_historical_data(mock_historical_data)
|
||||
|
||||
if len(parsed_data) > 1:
|
||||
dates = [record['date'] for record in parsed_data]
|
||||
assert dates == sorted(dates), "Historical data should be chronologically ordered"
|
||||
|
||||
def test_forecast_date_progression(self, weather_parser, mock_forecast_data):
|
||||
"""Test that forecast dates progress correctly"""
|
||||
parsed_forecast = weather_parser.parse_forecast_data(mock_forecast_data, 7)
|
||||
|
||||
if len(parsed_forecast) > 1:
|
||||
for i in range(1, len(parsed_forecast)):
|
||||
prev_date = parsed_forecast[i-1]['forecast_date']
|
||||
curr_date = parsed_forecast[i]['forecast_date']
|
||||
diff = (curr_date - prev_date).days
|
||||
assert diff == 1, f"Forecast dates should be consecutive days, got {diff} day difference"
|
||||
|
||||
|
||||
class TestAEMETErrorRecovery:
|
||||
"""Test error recovery and resilience"""
|
||||
|
||||
async def test_network_interruption_recovery(self, aemet_client, madrid_coords):
|
||||
"""Test recovery from network interruptions"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Mock intermittent network failures
|
||||
call_count = 0
|
||||
|
||||
async def mock_get_with_failures(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count <= 2: # Fail first two calls
|
||||
raise Exception("Network timeout")
|
||||
else:
|
||||
return {"datos": "http://example.com/data"}
|
||||
|
||||
with patch.object(aemet_client, '_get', side_effect=mock_get_with_failures):
|
||||
result = await aemet_client.get_current_weather(lat, lon)
|
||||
|
||||
# Should eventually succeed or fallback to synthetic
|
||||
assert result is not None, "Should recover from network failures"
|
||||
assert result['source'] in [WeatherSource.AEMET.value, WeatherSource.SYNTHETIC.value]
|
||||
|
||||
async def test_partial_data_recovery(self, aemet_client, madrid_coords, weather_parser):
|
||||
"""Test recovery from partial/corrupted data"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Mock corrupted historical data (some records missing fields)
|
||||
corrupted_data = [
|
||||
{"fecha": "2025-07-20", "tmax": 25.2}, # Missing tmin and other fields
|
||||
{"fecha": "2025-07-21"}, # Only has date
|
||||
{"tmax": 27.0, "tmin": 15.0}, # Missing date
|
||||
{"fecha": "2025-07-22", "tmax": 23.0, "tmin": 14.0, "prec": 0.0} # Complete record
|
||||
]
|
||||
|
||||
parsed_data = weather_parser.parse_historical_data(corrupted_data)
|
||||
|
||||
# Should only return valid records and handle corrupted ones gracefully
|
||||
assert isinstance(parsed_data, list), "Should return list even with corrupted data"
|
||||
valid_records = [r for r in parsed_data if 'date' in r and r['date'] is not None]
|
||||
assert len(valid_records) >= 1, "Should salvage at least some valid records"
|
||||
|
||||
async def test_malformed_json_recovery(self, aemet_client, madrid_coords):
|
||||
"""Test recovery from malformed JSON responses"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Mock malformed responses
|
||||
malformed_responses = [
|
||||
None,
|
||||
"",
|
||||
"invalid json",
|
||||
{"incomplete": "response"},
|
||||
{"datos": None},
|
||||
{"datos": ""},
|
||||
]
|
||||
|
||||
for response in malformed_responses:
|
||||
with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=response):
|
||||
result = await aemet_client.get_current_weather(lat, lon)
|
||||
|
||||
assert result is not None, f"Should handle malformed response: {response}"
|
||||
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic"
|
||||
|
||||
async def test_api_rate_limiting_recovery(self, aemet_client, madrid_coords):
|
||||
"""Test recovery from API rate limiting"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Mock rate limiting responses
|
||||
rate_limit_response = {
|
||||
"descripcion": "Demasiadas peticiones",
|
||||
"estado": 429
|
||||
}
|
||||
|
||||
with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=rate_limit_response):
|
||||
result = await aemet_client.get_current_weather(lat, lon)
|
||||
|
||||
assert result is not None, "Should handle rate limiting"
|
||||
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic on rate limit"
|
||||
|
||||
|
||||
class TestAEMETPerformanceAndScaling:
|
||||
"""Test performance characteristics and scaling behavior"""
|
||||
|
||||
async def test_concurrent_requests_performance(self, aemet_client, madrid_coords):
|
||||
"""Test performance with concurrent requests"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Create multiple concurrent requests
|
||||
tasks = []
|
||||
for i in range(10):
|
||||
task = aemet_client.get_current_weather(lat, lon)
|
||||
tasks.append(task)
|
||||
|
||||
start_time = datetime.now()
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
execution_time = (datetime.now() - start_time).total_seconds() * 1000
|
||||
|
||||
# Check that most requests succeeded
|
||||
successful_results = [r for r in results if isinstance(r, dict) and 'temperature' in r]
|
||||
assert len(successful_results) >= 8, "Most concurrent requests should succeed"
|
||||
|
||||
# Should complete in reasonable time (allowing for potential API rate limiting)
|
||||
assert execution_time < 15000, f"Concurrent requests took too long: {execution_time:.0f}ms"
|
||||
|
||||
print(f"✅ Concurrent requests test - {len(successful_results)}/10 succeeded in {execution_time:.0f}ms")
|
||||
|
||||
async def test_memory_usage_with_large_datasets(self, aemet_client, madrid_coords):
|
||||
"""Test memory usage with large historical datasets"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Request large historical dataset
|
||||
end_date = datetime.now()
|
||||
start_date = end_date - timedelta(days=90) # 3 months
|
||||
|
||||
import psutil
|
||||
import os
|
||||
|
||||
# Get initial memory usage
|
||||
process = psutil.Process(os.getpid())
|
||||
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||||
|
||||
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
|
||||
|
||||
# Get final memory usage
|
||||
final_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||||
memory_increase = final_memory - initial_memory
|
||||
|
||||
assert isinstance(result, list), "Should return historical data"
|
||||
|
||||
# Memory increase should be reasonable (less than 100MB for 90 days)
|
||||
assert memory_increase < 100, f"Memory usage increased too much: {memory_increase:.1f}MB"
|
||||
|
||||
print(f"✅ Memory usage test - {len(result)} records, +{memory_increase:.1f}MB")
|
||||
|
||||
async def test_caching_behavior(self, aemet_client, madrid_coords):
|
||||
"""Test caching behavior and performance improvement"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# First request (cold)
|
||||
start_time = datetime.now()
|
||||
result1 = await aemet_client.get_current_weather(lat, lon)
|
||||
first_call_time = (datetime.now() - start_time).total_seconds() * 1000
|
||||
|
||||
# Second request (potentially cached)
|
||||
start_time = datetime.now()
|
||||
result2 = await aemet_client.get_current_weather(lat, lon)
|
||||
second_call_time = (datetime.now() - start_time).total_seconds() * 1000
|
||||
|
||||
assert result1 is not None, "First call should succeed"
|
||||
assert result2 is not None, "Second call should succeed"
|
||||
|
||||
# Both should return valid data
|
||||
assert 'temperature' in result1, "First result should have temperature"
|
||||
assert 'temperature' in result2, "Second result should have temperature"
|
||||
|
||||
print(f"✅ Caching test - First call: {first_call_time:.0f}ms, Second call: {second_call_time:.0f}ms")
|
||||
|
||||
|
||||
class TestAEMETIntegrationScenarios:
|
||||
"""Test realistic integration scenarios"""
|
||||
|
||||
async def test_daily_weather_workflow(self, aemet_client, madrid_coords):
|
||||
"""Test a complete daily weather workflow"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Simulate a daily weather check workflow
|
||||
workflow_results = {}
|
||||
|
||||
# Step 1: Get current conditions
|
||||
current = await aemet_client.get_current_weather(lat, lon)
|
||||
workflow_results['current'] = current
|
||||
assert current is not None, "Should get current weather"
|
||||
|
||||
# Step 2: Get today's forecast
|
||||
forecast = await aemet_client.get_forecast(lat, lon, 1)
|
||||
workflow_results['forecast'] = forecast
|
||||
assert len(forecast) == 1, "Should get today's forecast"
|
||||
|
||||
# Step 3: Get week ahead forecast
|
||||
week_forecast = await aemet_client.get_forecast(lat, lon, 7)
|
||||
workflow_results['week_forecast'] = week_forecast
|
||||
assert len(week_forecast) == 7, "Should get 7-day forecast"
|
||||
|
||||
# Step 4: Get last week's actual weather for comparison
|
||||
end_date = datetime.now() - timedelta(days=1)
|
||||
start_date = end_date - timedelta(days=7)
|
||||
historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
|
||||
workflow_results['historical'] = historical
|
||||
assert isinstance(historical, list), "Should get historical data"
|
||||
|
||||
# Validate workflow consistency
|
||||
all_sources = set()
|
||||
if current: all_sources.add(current['source'])
|
||||
if forecast: all_sources.add(forecast[0]['source'])
|
||||
if week_forecast: all_sources.add(week_forecast[0]['source'])
|
||||
if historical: all_sources.update([h['source'] for h in historical])
|
||||
|
||||
print(f"✅ Daily workflow test - Sources used: {', '.join(all_sources)}")
|
||||
|
||||
return workflow_results
|
||||
|
||||
async def test_weather_alerting_scenario(self, aemet_client, madrid_coords):
|
||||
"""Test weather alerting scenario"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get forecast for potential alerts
|
||||
forecast = await aemet_client.get_forecast(lat, lon, 3)
|
||||
|
||||
alerts = []
|
||||
for day in forecast:
|
||||
# Check for extreme temperatures
|
||||
if day['temperature'] > 35:
|
||||
alerts.append(f"High temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}")
|
||||
elif day['temperature'] < -5:
|
||||
alerts.append(f"Low temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}")
|
||||
|
||||
# Check for high precipitation
|
||||
if day['precipitation'] > 20:
|
||||
alerts.append(f"Heavy rain alert: {day['precipitation']}mm on {day['forecast_date'].date()}")
|
||||
|
||||
# Alerts should be properly formatted
|
||||
for alert in alerts:
|
||||
assert isinstance(alert, str), "Alert should be string"
|
||||
assert "alert" in alert.lower(), "Alert should contain 'alert'"
|
||||
|
||||
print(f"✅ Weather alerting test - {len(alerts)} alerts generated")
|
||||
|
||||
return alerts
|
||||
|
||||
async def test_historical_analysis_scenario(self, aemet_client, madrid_coords):
|
||||
"""Test historical weather analysis scenario"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get historical data for analysis
|
||||
end_date = datetime.now()
|
||||
start_date = end_date - timedelta(days=30)
|
||||
|
||||
historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
|
||||
|
||||
if historical:
|
||||
# Calculate statistics
|
||||
temperatures = [h['temperature'] for h in historical if h['temperature'] is not None]
|
||||
precipitations = [h['precipitation'] for h in historical if h['precipitation'] is not None]
|
||||
|
||||
if temperatures:
|
||||
avg_temp = sum(temperatures) / len(temperatures)
|
||||
max_temp = max(temperatures)
|
||||
min_temp = min(temperatures)
|
||||
|
||||
# Validate statistics
|
||||
assert min_temp <= avg_temp <= max_temp, "Temperature statistics should be logical"
|
||||
assert -20 <= min_temp <= 50, "Min temperature should be reasonable"
|
||||
assert -20 <= max_temp <= 50, "Max temperature should be reasonable"
|
||||
|
||||
if precipitations:
|
||||
total_precip = sum(precipitations)
|
||||
rainy_days = len([p for p in precipitations if p > 0.1])
|
||||
|
||||
# Validate precipitation statistics
|
||||
assert total_precip >= 0, "Total precipitation should be non-negative"
|
||||
assert 0 <= rainy_days <= len(precipitations), "Rainy days should be reasonable"
|
||||
|
||||
print(f"✅ Historical analysis test - {len(historical)} records analyzed")
|
||||
|
||||
return {
|
||||
'record_count': len(historical),
|
||||
'avg_temp': avg_temp if temperatures else None,
|
||||
'temp_range': (min_temp, max_temp) if temperatures else None,
|
||||
'total_precip': total_precip if precipitations else None,
|
||||
'rainy_days': rainy_days if precipitations else None
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
class TestAEMETRegressionTests:
|
||||
"""Regression tests for previously fixed issues"""
|
||||
|
||||
async def test_timezone_handling_regression(self, aemet_client, madrid_coords):
|
||||
"""Regression test for timezone handling issues"""
|
||||
lat, lon = madrid_coords
|
||||
|
||||
# Get current weather and forecast
|
||||
current = await aemet_client.get_current_weather(lat, lon)
|
||||
forecast = await aemet_client.get_forecast(lat, lon, 2)
|
||||
|
||||
if current:
|
||||
# Current weather date should be recent (within last hour)
|
||||
now = datetime.now()
|
||||
time_diff = abs((now - current['date']).total_seconds())
|
||||
assert time_diff < 3600, "Current weather timestamp should be recent"
|
||||
|
||||
if forecast:
|
||||
# Forecast dates should be in the future
|
||||
now = datetime.now().date()
|
||||
for day in forecast:
|
||||
forecast_date = day['forecast_date'].date()
|
||||
assert forecast_date >= now, f"Forecast date {forecast_date} should be today or future"
|
||||
|
||||
async def test_data_type_conversion_regression(self, weather_parser):
|
||||
"""Regression test for data type conversion issues"""
|
||||
# Test cases that previously caused issues
|
||||
test_cases = [
|
||||
("25.5", 25.5), # String to float
|
||||
(25, 25.0), # Int to float
|
||||
("", None), # Empty string
|
||||
("invalid", None), # Invalid string
|
||||
(None, None), # None input
|
||||
]
|
||||
|
||||
for input_val, expected in test_cases:
|
||||
result = weather_parser.safe_float(input_val, None)
|
||||
if expected is None:
|
||||
assert result is None, f"Expected None for input {input_val}, got {result}"
|
||||
else:
|
||||
assert result == expected, f"Expected {expected} for input {input_val}, got {result}"
|
||||
|
||||
def test_empty_data_handling_regression(self, weather_parser):
|
||||
"""Regression test for empty data handling"""
|
||||
# Empty lists and dictionaries should be handled gracefully
|
||||
empty_data_cases = [
|
||||
[],
|
||||
[{}],
|
||||
[{"invalid": "data"}],
|
||||
None,
|
||||
]
|
||||
|
||||
for empty_data in empty_data_cases:
|
||||
result = weather_parser.parse_historical_data(empty_data if empty_data is not None else [])
|
||||
assert isinstance(result, list), f"Should return list for empty data: {empty_data}"
|
||||
# May be empty or have some synthetic data, but should not crash
|
||||
|
||||
|
||||
# ================================================================
|
||||
# STANDALONE TEST RUNNER FOR EDGE CASES
|
||||
# ================================================================
|
||||
|
||||
async def run_edge_case_tests():
|
||||
"""Run edge case tests manually"""
|
||||
print("="*60)
|
||||
print("AEMET EDGE CASE TESTS")
|
||||
print("="*60)
|
||||
|
||||
client = AEMETClient()
|
||||
parser = WeatherDataParser()
|
||||
generator = SyntheticWeatherGenerator()
|
||||
|
||||
madrid_coords = (40.4168, -3.7038)
|
||||
|
||||
print(f"\n1. Testing extreme coordinates...")
|
||||
extreme_result = await client.get_current_weather(90, 180)
|
||||
print(f" Extreme coords result: {extreme_result['source']} source")
|
||||
|
||||
print(f"\n2. Testing parser edge cases...")
|
||||
parser_tests = [
|
||||
parser.safe_float(None, 10.0),
|
||||
parser.safe_float("invalid", 5.0),
|
||||
parser.extract_temperature_value([]),
|
||||
]
|
||||
print(f" Parser edge cases passed: {len(parser_tests)}")
|
||||
|
||||
print(f"\n3. Testing synthetic generator extremes...")
|
||||
large_forecast = generator.generate_forecast_sync(100)
|
||||
print(f" Generated {len(large_forecast)} forecast days")
|
||||
|
||||
print(f"\n4. Testing concurrent requests...")
|
||||
tasks = [client.get_current_weather(*madrid_coords) for _ in range(5)]
|
||||
concurrent_results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
successful = len([r for r in concurrent_results if isinstance(r, dict)])
|
||||
print(f" Concurrent requests: {successful}/5 successful")
|
||||
|
||||
print(f"\n✅ Edge case tests completed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run_edge_case_tests())
|
||||
Reference in New Issue
Block a user