Files
bakery-ia/services/data/tests/test_aemet_edge_cases.py

594 lines
26 KiB
Python
Raw Normal View History

2025-07-24 16:07:58 +02:00
# ================================================================
# services/data/tests/test_aemet_edge_cases.py
# ================================================================
"""
Edge cases and integration tests for AEMET weather API client
Covers boundary conditions, error scenarios, and complex integrations
"""
import pytest
import asyncio
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, AsyncMock
import json
from typing import Dict, List, Any
from app.external.aemet import (
AEMETClient,
WeatherDataParser,
SyntheticWeatherGenerator,
LocationService,
AEMETConstants,
WeatherSource
)
# Configure pytest-asyncio
pytestmark = pytest.mark.asyncio
class TestAEMETEdgeCases:
"""Test edge cases and boundary conditions"""
async def test_extreme_coordinates(self, aemet_client):
"""Test handling of extreme coordinate values"""
extreme_coords = [
(90, 180), # North pole, antimeridian
(-90, -180), # South pole, antimeridian
(0, 0), # Null island
(40.5, -180), # Valid latitude, extreme longitude
(90, -3.7), # Extreme latitude, Madrid longitude
]
for lat, lon in extreme_coords:
result = await aemet_client.get_current_weather(lat, lon)
assert result is not None, f"Should handle extreme coords ({lat}, {lon})"
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic for extreme coords"
assert isinstance(result['temperature'], (int, float)), "Should have valid temperature"
async def test_boundary_date_ranges(self, aemet_client, madrid_coords):
"""Test boundary conditions for date ranges"""
lat, lon = madrid_coords
now = datetime.now()
# Test same start and end date
result = await aemet_client.get_historical_weather(lat, lon, now, now)
assert isinstance(result, list), "Should return list for same-day request"
# Test reverse date range (end before start)
start_date = now
end_date = now - timedelta(days=1)
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
assert isinstance(result, list), "Should handle reverse date range gracefully"
# Test extremely large date range
start_date = now - timedelta(days=1000)
end_date = now
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
assert isinstance(result, list), "Should handle very large date ranges"
async def test_forecast_edge_durations(self, aemet_client, madrid_coords):
"""Test forecast with edge case durations"""
lat, lon = madrid_coords
edge_durations = [0, 1, 30, 365, -1, 1000]
for days in edge_durations:
try:
result = await aemet_client.get_forecast(lat, lon, days)
if days <= 0:
assert len(result) == 0 or result is None, f"Should handle non-positive days ({days})"
elif days > 100:
# Should handle gracefully, possibly with synthetic data
assert isinstance(result, list), f"Should handle large day count ({days})"
else:
assert len(result) == days, f"Should return {days} forecast days"
except Exception as e:
# Some edge cases might raise exceptions, which is acceptable
print(f" Days={days} raised exception: {e}")
def test_parser_edge_cases(self, weather_parser):
"""Test weather data parser with edge case inputs"""
# Test with None values
result = weather_parser.safe_float(None, 10.0)
assert result == 10.0, "Should return default for None"
# Test with empty strings
result = weather_parser.safe_float("", 5.0)
assert result == 5.0, "Should return default for empty string"
# Test with extreme values
result = weather_parser.safe_float("999999.99", 0.0)
assert result == 999999.99, "Should handle large numbers"
result = weather_parser.safe_float("-999.99", 0.0)
assert result == -999.99, "Should handle negative numbers"
# Test temperature extraction edge cases
assert weather_parser.extract_temperature_value([]) is None, "Should handle empty list"
assert weather_parser.extract_temperature_value({}) is None, "Should handle empty dict"
assert weather_parser.extract_temperature_value("invalid") is None, "Should handle invalid string"
def test_synthetic_generator_edge_cases(self, synthetic_generator):
"""Test synthetic weather generator edge cases"""
# Test with extreme date ranges
end_date = datetime.now()
start_date = end_date - timedelta(days=1000)
result = synthetic_generator.generate_historical_data(start_date, end_date)
assert isinstance(result, list), "Should handle large date ranges"
assert len(result) == 1001, "Should generate correct number of days"
# Test forecast with zero days
result = synthetic_generator.generate_forecast_sync(0)
assert result == [], "Should return empty list for zero days"
# Test forecast with large number of days
result = synthetic_generator.generate_forecast_sync(1000)
assert len(result) == 1000, "Should handle large forecast ranges"
def test_location_service_edge_cases(self):
"""Test location service edge cases"""
# Test distance calculation with same points
distance = LocationService.calculate_distance(40.4, -3.7, 40.4, -3.7)
assert distance == 0.0, "Distance between same points should be zero"
# Test distance calculation with antipodal points
distance = LocationService.calculate_distance(40.4, -3.7, -40.4, 176.3)
assert distance > 15000, "Antipodal points should be far apart"
# Test station finding with no stations (if list were empty)
with patch.object(AEMETConstants, 'MADRID_STATIONS', []):
station = LocationService.find_nearest_station(40.4, -3.7)
assert station is None, "Should return None when no stations available"
class TestAEMETDataIntegrity:
"""Test data integrity and consistency"""
async def test_data_type_consistency(self, aemet_client, madrid_coords):
"""Test that data types are consistent across calls"""
lat, lon = madrid_coords
# Get current weather multiple times
results = []
for _ in range(3):
result = await aemet_client.get_current_weather(lat, lon)
results.append(result)
# Check that field types are consistent
if all(r is not None for r in results):
for field in ['temperature', 'precipitation', 'humidity', 'wind_speed', 'pressure']:
types = [type(r[field]) for r in results if field in r]
if types:
first_type = types[0]
assert all(t == first_type for t in types), f"Inconsistent types for {field}: {types}"
async def test_temperature_consistency(self, aemet_client, madrid_coords):
"""Test temperature consistency between different data sources"""
lat, lon = madrid_coords
# Get current weather and today's forecast
current = await aemet_client.get_current_weather(lat, lon)
forecast = await aemet_client.get_forecast(lat, lon, 1)
if current and forecast and len(forecast) > 0:
current_temp = current['temperature']
forecast_temp = forecast[0]['temperature']
# Temperatures should be reasonably close (within 15°C)
temp_diff = abs(current_temp - forecast_temp)
assert temp_diff < 15, f"Temperature difference too large: current={current_temp}°C, forecast={forecast_temp}°C"
async def test_source_consistency(self, aemet_client, madrid_coords):
"""Test that data source is consistent within same time period"""
lat, lon = madrid_coords
# Get multiple current weather readings
current1 = await aemet_client.get_current_weather(lat, lon)
current2 = await aemet_client.get_current_weather(lat, lon)
if current1 and current2:
# Should use same source type (both real or both synthetic)
assert current1['source'] == current2['source'], "Should use consistent data source"
def test_historical_data_ordering(self, weather_parser, mock_historical_data):
"""Test that historical data is properly ordered"""
parsed_data = weather_parser.parse_historical_data(mock_historical_data)
if len(parsed_data) > 1:
dates = [record['date'] for record in parsed_data]
assert dates == sorted(dates), "Historical data should be chronologically ordered"
def test_forecast_date_progression(self, weather_parser, mock_forecast_data):
"""Test that forecast dates progress correctly"""
parsed_forecast = weather_parser.parse_forecast_data(mock_forecast_data, 7)
if len(parsed_forecast) > 1:
for i in range(1, len(parsed_forecast)):
prev_date = parsed_forecast[i-1]['forecast_date']
curr_date = parsed_forecast[i]['forecast_date']
diff = (curr_date - prev_date).days
assert diff == 1, f"Forecast dates should be consecutive days, got {diff} day difference"
class TestAEMETErrorRecovery:
"""Test error recovery and resilience"""
async def test_network_interruption_recovery(self, aemet_client, madrid_coords):
"""Test recovery from network interruptions"""
lat, lon = madrid_coords
# Mock intermittent network failures
call_count = 0
async def mock_get_with_failures(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2: # Fail first two calls
raise Exception("Network timeout")
else:
return {"datos": "http://example.com/data"}
with patch.object(aemet_client, '_get', side_effect=mock_get_with_failures):
result = await aemet_client.get_current_weather(lat, lon)
# Should eventually succeed or fallback to synthetic
assert result is not None, "Should recover from network failures"
assert result['source'] in [WeatherSource.AEMET.value, WeatherSource.SYNTHETIC.value]
async def test_partial_data_recovery(self, aemet_client, madrid_coords, weather_parser):
"""Test recovery from partial/corrupted data"""
lat, lon = madrid_coords
# Mock corrupted historical data (some records missing fields)
corrupted_data = [
{"fecha": "2025-07-20", "tmax": 25.2}, # Missing tmin and other fields
{"fecha": "2025-07-21"}, # Only has date
{"tmax": 27.0, "tmin": 15.0}, # Missing date
{"fecha": "2025-07-22", "tmax": 23.0, "tmin": 14.0, "prec": 0.0} # Complete record
]
parsed_data = weather_parser.parse_historical_data(corrupted_data)
# Should only return valid records and handle corrupted ones gracefully
assert isinstance(parsed_data, list), "Should return list even with corrupted data"
valid_records = [r for r in parsed_data if 'date' in r and r['date'] is not None]
assert len(valid_records) >= 1, "Should salvage at least some valid records"
async def test_malformed_json_recovery(self, aemet_client, madrid_coords):
"""Test recovery from malformed JSON responses"""
lat, lon = madrid_coords
# Mock malformed responses
malformed_responses = [
None,
"",
"invalid json",
{"incomplete": "response"},
{"datos": None},
{"datos": ""},
]
for response in malformed_responses:
with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=response):
result = await aemet_client.get_current_weather(lat, lon)
assert result is not None, f"Should handle malformed response: {response}"
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic"
async def test_api_rate_limiting_recovery(self, aemet_client, madrid_coords):
"""Test recovery from API rate limiting"""
lat, lon = madrid_coords
# Mock rate limiting responses
rate_limit_response = {
"descripcion": "Demasiadas peticiones",
"estado": 429
}
with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=rate_limit_response):
result = await aemet_client.get_current_weather(lat, lon)
assert result is not None, "Should handle rate limiting"
assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic on rate limit"
class TestAEMETPerformanceAndScaling:
"""Test performance characteristics and scaling behavior"""
async def test_concurrent_requests_performance(self, aemet_client, madrid_coords):
"""Test performance with concurrent requests"""
lat, lon = madrid_coords
# Create multiple concurrent requests
tasks = []
for i in range(10):
task = aemet_client.get_current_weather(lat, lon)
tasks.append(task)
start_time = datetime.now()
results = await asyncio.gather(*tasks, return_exceptions=True)
execution_time = (datetime.now() - start_time).total_seconds() * 1000
# Check that most requests succeeded
successful_results = [r for r in results if isinstance(r, dict) and 'temperature' in r]
assert len(successful_results) >= 8, "Most concurrent requests should succeed"
# Should complete in reasonable time (allowing for potential API rate limiting)
assert execution_time < 15000, f"Concurrent requests took too long: {execution_time:.0f}ms"
print(f"✅ Concurrent requests test - {len(successful_results)}/10 succeeded in {execution_time:.0f}ms")
async def test_memory_usage_with_large_datasets(self, aemet_client, madrid_coords):
"""Test memory usage with large historical datasets"""
lat, lon = madrid_coords
# Request large historical dataset
end_date = datetime.now()
start_date = end_date - timedelta(days=90) # 3 months
import psutil
import os
# Get initial memory usage
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
# Get final memory usage
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = final_memory - initial_memory
assert isinstance(result, list), "Should return historical data"
# Memory increase should be reasonable (less than 100MB for 90 days)
assert memory_increase < 100, f"Memory usage increased too much: {memory_increase:.1f}MB"
print(f"✅ Memory usage test - {len(result)} records, +{memory_increase:.1f}MB")
async def test_caching_behavior(self, aemet_client, madrid_coords):
"""Test caching behavior and performance improvement"""
lat, lon = madrid_coords
# First request (cold)
start_time = datetime.now()
result1 = await aemet_client.get_current_weather(lat, lon)
first_call_time = (datetime.now() - start_time).total_seconds() * 1000
# Second request (potentially cached)
start_time = datetime.now()
result2 = await aemet_client.get_current_weather(lat, lon)
second_call_time = (datetime.now() - start_time).total_seconds() * 1000
assert result1 is not None, "First call should succeed"
assert result2 is not None, "Second call should succeed"
# Both should return valid data
assert 'temperature' in result1, "First result should have temperature"
assert 'temperature' in result2, "Second result should have temperature"
print(f"✅ Caching test - First call: {first_call_time:.0f}ms, Second call: {second_call_time:.0f}ms")
class TestAEMETIntegrationScenarios:
"""Test realistic integration scenarios"""
async def test_daily_weather_workflow(self, aemet_client, madrid_coords):
"""Test a complete daily weather workflow"""
lat, lon = madrid_coords
# Simulate a daily weather check workflow
workflow_results = {}
# Step 1: Get current conditions
current = await aemet_client.get_current_weather(lat, lon)
workflow_results['current'] = current
assert current is not None, "Should get current weather"
# Step 2: Get today's forecast
forecast = await aemet_client.get_forecast(lat, lon, 1)
workflow_results['forecast'] = forecast
assert len(forecast) == 1, "Should get today's forecast"
# Step 3: Get week ahead forecast
week_forecast = await aemet_client.get_forecast(lat, lon, 7)
workflow_results['week_forecast'] = week_forecast
assert len(week_forecast) == 7, "Should get 7-day forecast"
# Step 4: Get last week's actual weather for comparison
end_date = datetime.now() - timedelta(days=1)
start_date = end_date - timedelta(days=7)
historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
workflow_results['historical'] = historical
assert isinstance(historical, list), "Should get historical data"
# Validate workflow consistency
all_sources = set()
if current: all_sources.add(current['source'])
if forecast: all_sources.add(forecast[0]['source'])
if week_forecast: all_sources.add(week_forecast[0]['source'])
if historical: all_sources.update([h['source'] for h in historical])
print(f"✅ Daily workflow test - Sources used: {', '.join(all_sources)}")
return workflow_results
async def test_weather_alerting_scenario(self, aemet_client, madrid_coords):
"""Test weather alerting scenario"""
lat, lon = madrid_coords
# Get forecast for potential alerts
forecast = await aemet_client.get_forecast(lat, lon, 3)
alerts = []
for day in forecast:
# Check for extreme temperatures
if day['temperature'] > 35:
alerts.append(f"High temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}")
elif day['temperature'] < -5:
alerts.append(f"Low temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}")
# Check for high precipitation
if day['precipitation'] > 20:
alerts.append(f"Heavy rain alert: {day['precipitation']}mm on {day['forecast_date'].date()}")
# Alerts should be properly formatted
for alert in alerts:
assert isinstance(alert, str), "Alert should be string"
assert "alert" in alert.lower(), "Alert should contain 'alert'"
print(f"✅ Weather alerting test - {len(alerts)} alerts generated")
return alerts
async def test_historical_analysis_scenario(self, aemet_client, madrid_coords):
"""Test historical weather analysis scenario"""
lat, lon = madrid_coords
# Get historical data for analysis
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date)
if historical:
# Calculate statistics
temperatures = [h['temperature'] for h in historical if h['temperature'] is not None]
precipitations = [h['precipitation'] for h in historical if h['precipitation'] is not None]
if temperatures:
avg_temp = sum(temperatures) / len(temperatures)
max_temp = max(temperatures)
min_temp = min(temperatures)
# Validate statistics
assert min_temp <= avg_temp <= max_temp, "Temperature statistics should be logical"
assert -20 <= min_temp <= 50, "Min temperature should be reasonable"
assert -20 <= max_temp <= 50, "Max temperature should be reasonable"
if precipitations:
total_precip = sum(precipitations)
rainy_days = len([p for p in precipitations if p > 0.1])
# Validate precipitation statistics
assert total_precip >= 0, "Total precipitation should be non-negative"
assert 0 <= rainy_days <= len(precipitations), "Rainy days should be reasonable"
print(f"✅ Historical analysis test - {len(historical)} records analyzed")
return {
'record_count': len(historical),
'avg_temp': avg_temp if temperatures else None,
'temp_range': (min_temp, max_temp) if temperatures else None,
'total_precip': total_precip if precipitations else None,
'rainy_days': rainy_days if precipitations else None
}
return {}
class TestAEMETRegressionTests:
"""Regression tests for previously fixed issues"""
async def test_timezone_handling_regression(self, aemet_client, madrid_coords):
"""Regression test for timezone handling issues"""
lat, lon = madrid_coords
# Get current weather and forecast
current = await aemet_client.get_current_weather(lat, lon)
forecast = await aemet_client.get_forecast(lat, lon, 2)
if current:
# Current weather date should be recent (within last hour)
now = datetime.now()
time_diff = abs((now - current['date']).total_seconds())
assert time_diff < 3600, "Current weather timestamp should be recent"
if forecast:
# Forecast dates should be in the future
now = datetime.now().date()
for day in forecast:
forecast_date = day['forecast_date'].date()
assert forecast_date >= now, f"Forecast date {forecast_date} should be today or future"
async def test_data_type_conversion_regression(self, weather_parser):
"""Regression test for data type conversion issues"""
# Test cases that previously caused issues
test_cases = [
("25.5", 25.5), # String to float
(25, 25.0), # Int to float
("", None), # Empty string
("invalid", None), # Invalid string
(None, None), # None input
]
for input_val, expected in test_cases:
result = weather_parser.safe_float(input_val, None)
if expected is None:
assert result is None, f"Expected None for input {input_val}, got {result}"
else:
assert result == expected, f"Expected {expected} for input {input_val}, got {result}"
def test_empty_data_handling_regression(self, weather_parser):
"""Regression test for empty data handling"""
# Empty lists and dictionaries should be handled gracefully
empty_data_cases = [
[],
[{}],
[{"invalid": "data"}],
None,
]
for empty_data in empty_data_cases:
result = weather_parser.parse_historical_data(empty_data if empty_data is not None else [])
assert isinstance(result, list), f"Should return list for empty data: {empty_data}"
# May be empty or have some synthetic data, but should not crash
# ================================================================
# STANDALONE TEST RUNNER FOR EDGE CASES
# ================================================================
async def run_edge_case_tests():
"""Run edge case tests manually"""
print("="*60)
print("AEMET EDGE CASE TESTS")
print("="*60)
client = AEMETClient()
parser = WeatherDataParser()
generator = SyntheticWeatherGenerator()
madrid_coords = (40.4168, -3.7038)
print(f"\n1. Testing extreme coordinates...")
extreme_result = await client.get_current_weather(90, 180)
print(f" Extreme coords result: {extreme_result['source']} source")
print(f"\n2. Testing parser edge cases...")
parser_tests = [
parser.safe_float(None, 10.0),
parser.safe_float("invalid", 5.0),
parser.extract_temperature_value([]),
]
print(f" Parser edge cases passed: {len(parser_tests)}")
print(f"\n3. Testing synthetic generator extremes...")
large_forecast = generator.generate_forecast_sync(100)
print(f" Generated {len(large_forecast)} forecast days")
print(f"\n4. Testing concurrent requests...")
tasks = [client.get_current_weather(*madrid_coords) for _ in range(5)]
concurrent_results = await asyncio.gather(*tasks, return_exceptions=True)
successful = len([r for r in concurrent_results if isinstance(r, dict)])
print(f" Concurrent requests: {successful}/5 successful")
print(f"\n✅ Edge case tests completed!")
if __name__ == "__main__":
asyncio.run(run_edge_case_tests())