# ================================================================ # services/data/tests/test_aemet_edge_cases.py # ================================================================ """ Edge cases and integration tests for AEMET weather API client Covers boundary conditions, error scenarios, and complex integrations """ import pytest import asyncio from datetime import datetime, timedelta from unittest.mock import Mock, patch, AsyncMock import json from typing import Dict, List, Any from app.external.aemet import ( AEMETClient, WeatherDataParser, SyntheticWeatherGenerator, LocationService, AEMETConstants, WeatherSource ) # Configure pytest-asyncio pytestmark = pytest.mark.asyncio class TestAEMETEdgeCases: """Test edge cases and boundary conditions""" async def test_extreme_coordinates(self, aemet_client): """Test handling of extreme coordinate values""" extreme_coords = [ (90, 180), # North pole, antimeridian (-90, -180), # South pole, antimeridian (0, 0), # Null island (40.5, -180), # Valid latitude, extreme longitude (90, -3.7), # Extreme latitude, Madrid longitude ] for lat, lon in extreme_coords: result = await aemet_client.get_current_weather(lat, lon) assert result is not None, f"Should handle extreme coords ({lat}, {lon})" assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic for extreme coords" assert isinstance(result['temperature'], (int, float)), "Should have valid temperature" async def test_boundary_date_ranges(self, aemet_client, madrid_coords): """Test boundary conditions for date ranges""" lat, lon = madrid_coords now = datetime.now() # Test same start and end date result = await aemet_client.get_historical_weather(lat, lon, now, now) assert isinstance(result, list), "Should return list for same-day request" # Test reverse date range (end before start) start_date = now end_date = now - timedelta(days=1) result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date) assert isinstance(result, list), "Should handle reverse date range gracefully" # Test extremely large date range start_date = now - timedelta(days=1000) end_date = now result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date) assert isinstance(result, list), "Should handle very large date ranges" async def test_forecast_edge_durations(self, aemet_client, madrid_coords): """Test forecast with edge case durations""" lat, lon = madrid_coords edge_durations = [0, 1, 30, 365, -1, 1000] for days in edge_durations: try: result = await aemet_client.get_forecast(lat, lon, days) if days <= 0: assert len(result) == 0 or result is None, f"Should handle non-positive days ({days})" elif days > 100: # Should handle gracefully, possibly with synthetic data assert isinstance(result, list), f"Should handle large day count ({days})" else: assert len(result) == days, f"Should return {days} forecast days" except Exception as e: # Some edge cases might raise exceptions, which is acceptable print(f"ℹ️ Days={days} raised exception: {e}") def test_parser_edge_cases(self, weather_parser): """Test weather data parser with edge case inputs""" # Test with None values result = weather_parser.safe_float(None, 10.0) assert result == 10.0, "Should return default for None" # Test with empty strings result = weather_parser.safe_float("", 5.0) assert result == 5.0, "Should return default for empty string" # Test with extreme values result = weather_parser.safe_float("999999.99", 0.0) assert result == 999999.99, "Should handle large numbers" result = weather_parser.safe_float("-999.99", 0.0) assert result == -999.99, "Should handle negative numbers" # Test temperature extraction edge cases assert weather_parser.extract_temperature_value([]) is None, "Should handle empty list" assert weather_parser.extract_temperature_value({}) is None, "Should handle empty dict" assert weather_parser.extract_temperature_value("invalid") is None, "Should handle invalid string" def test_synthetic_generator_edge_cases(self, synthetic_generator): """Test synthetic weather generator edge cases""" # Test with extreme date ranges end_date = datetime.now() start_date = end_date - timedelta(days=1000) result = synthetic_generator.generate_historical_data(start_date, end_date) assert isinstance(result, list), "Should handle large date ranges" assert len(result) == 1001, "Should generate correct number of days" # Test forecast with zero days result = synthetic_generator.generate_forecast_sync(0) assert result == [], "Should return empty list for zero days" # Test forecast with large number of days result = synthetic_generator.generate_forecast_sync(1000) assert len(result) == 1000, "Should handle large forecast ranges" def test_location_service_edge_cases(self): """Test location service edge cases""" # Test distance calculation with same points distance = LocationService.calculate_distance(40.4, -3.7, 40.4, -3.7) assert distance == 0.0, "Distance between same points should be zero" # Test distance calculation with antipodal points distance = LocationService.calculate_distance(40.4, -3.7, -40.4, 176.3) assert distance > 15000, "Antipodal points should be far apart" # Test station finding with no stations (if list were empty) with patch.object(AEMETConstants, 'MADRID_STATIONS', []): station = LocationService.find_nearest_station(40.4, -3.7) assert station is None, "Should return None when no stations available" class TestAEMETDataIntegrity: """Test data integrity and consistency""" async def test_data_type_consistency(self, aemet_client, madrid_coords): """Test that data types are consistent across calls""" lat, lon = madrid_coords # Get current weather multiple times results = [] for _ in range(3): result = await aemet_client.get_current_weather(lat, lon) results.append(result) # Check that field types are consistent if all(r is not None for r in results): for field in ['temperature', 'precipitation', 'humidity', 'wind_speed', 'pressure']: types = [type(r[field]) for r in results if field in r] if types: first_type = types[0] assert all(t == first_type for t in types), f"Inconsistent types for {field}: {types}" async def test_temperature_consistency(self, aemet_client, madrid_coords): """Test temperature consistency between different data sources""" lat, lon = madrid_coords # Get current weather and today's forecast current = await aemet_client.get_current_weather(lat, lon) forecast = await aemet_client.get_forecast(lat, lon, 1) if current and forecast and len(forecast) > 0: current_temp = current['temperature'] forecast_temp = forecast[0]['temperature'] # Temperatures should be reasonably close (within 15°C) temp_diff = abs(current_temp - forecast_temp) assert temp_diff < 15, f"Temperature difference too large: current={current_temp}°C, forecast={forecast_temp}°C" async def test_source_consistency(self, aemet_client, madrid_coords): """Test that data source is consistent within same time period""" lat, lon = madrid_coords # Get multiple current weather readings current1 = await aemet_client.get_current_weather(lat, lon) current2 = await aemet_client.get_current_weather(lat, lon) if current1 and current2: # Should use same source type (both real or both synthetic) assert current1['source'] == current2['source'], "Should use consistent data source" def test_historical_data_ordering(self, weather_parser, mock_historical_data): """Test that historical data is properly ordered""" parsed_data = weather_parser.parse_historical_data(mock_historical_data) if len(parsed_data) > 1: dates = [record['date'] for record in parsed_data] assert dates == sorted(dates), "Historical data should be chronologically ordered" def test_forecast_date_progression(self, weather_parser, mock_forecast_data): """Test that forecast dates progress correctly""" parsed_forecast = weather_parser.parse_forecast_data(mock_forecast_data, 7) if len(parsed_forecast) > 1: for i in range(1, len(parsed_forecast)): prev_date = parsed_forecast[i-1]['forecast_date'] curr_date = parsed_forecast[i]['forecast_date'] diff = (curr_date - prev_date).days assert diff == 1, f"Forecast dates should be consecutive days, got {diff} day difference" class TestAEMETErrorRecovery: """Test error recovery and resilience""" async def test_network_interruption_recovery(self, aemet_client, madrid_coords): """Test recovery from network interruptions""" lat, lon = madrid_coords # Mock intermittent network failures call_count = 0 async def mock_get_with_failures(*args, **kwargs): nonlocal call_count call_count += 1 if call_count <= 2: # Fail first two calls raise Exception("Network timeout") else: return {"datos": "http://example.com/data"} with patch.object(aemet_client, '_get', side_effect=mock_get_with_failures): result = await aemet_client.get_current_weather(lat, lon) # Should eventually succeed or fallback to synthetic assert result is not None, "Should recover from network failures" assert result['source'] in [WeatherSource.AEMET.value, WeatherSource.SYNTHETIC.value] async def test_partial_data_recovery(self, aemet_client, madrid_coords, weather_parser): """Test recovery from partial/corrupted data""" lat, lon = madrid_coords # Mock corrupted historical data (some records missing fields) corrupted_data = [ {"fecha": "2025-07-20", "tmax": 25.2}, # Missing tmin and other fields {"fecha": "2025-07-21"}, # Only has date {"tmax": 27.0, "tmin": 15.0}, # Missing date {"fecha": "2025-07-22", "tmax": 23.0, "tmin": 14.0, "prec": 0.0} # Complete record ] parsed_data = weather_parser.parse_historical_data(corrupted_data) # Should only return valid records and handle corrupted ones gracefully assert isinstance(parsed_data, list), "Should return list even with corrupted data" valid_records = [r for r in parsed_data if 'date' in r and r['date'] is not None] assert len(valid_records) >= 1, "Should salvage at least some valid records" async def test_malformed_json_recovery(self, aemet_client, madrid_coords): """Test recovery from malformed JSON responses""" lat, lon = madrid_coords # Mock malformed responses malformed_responses = [ None, "", "invalid json", {"incomplete": "response"}, {"datos": None}, {"datos": ""}, ] for response in malformed_responses: with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=response): result = await aemet_client.get_current_weather(lat, lon) assert result is not None, f"Should handle malformed response: {response}" assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic" async def test_api_rate_limiting_recovery(self, aemet_client, madrid_coords): """Test recovery from API rate limiting""" lat, lon = madrid_coords # Mock rate limiting responses rate_limit_response = { "descripcion": "Demasiadas peticiones", "estado": 429 } with patch.object(aemet_client, '_get', new_callable=AsyncMock, return_value=rate_limit_response): result = await aemet_client.get_current_weather(lat, lon) assert result is not None, "Should handle rate limiting" assert result['source'] == WeatherSource.SYNTHETIC.value, "Should fallback to synthetic on rate limit" class TestAEMETPerformanceAndScaling: """Test performance characteristics and scaling behavior""" async def test_concurrent_requests_performance(self, aemet_client, madrid_coords): """Test performance with concurrent requests""" lat, lon = madrid_coords # Create multiple concurrent requests tasks = [] for i in range(10): task = aemet_client.get_current_weather(lat, lon) tasks.append(task) start_time = datetime.now() results = await asyncio.gather(*tasks, return_exceptions=True) execution_time = (datetime.now() - start_time).total_seconds() * 1000 # Check that most requests succeeded successful_results = [r for r in results if isinstance(r, dict) and 'temperature' in r] assert len(successful_results) >= 8, "Most concurrent requests should succeed" # Should complete in reasonable time (allowing for potential API rate limiting) assert execution_time < 15000, f"Concurrent requests took too long: {execution_time:.0f}ms" print(f"✅ Concurrent requests test - {len(successful_results)}/10 succeeded in {execution_time:.0f}ms") async def test_memory_usage_with_large_datasets(self, aemet_client, madrid_coords): """Test memory usage with large historical datasets""" lat, lon = madrid_coords # Request large historical dataset end_date = datetime.now() start_date = end_date - timedelta(days=90) # 3 months import psutil import os # Get initial memory usage process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB result = await aemet_client.get_historical_weather(lat, lon, start_date, end_date) # Get final memory usage final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = final_memory - initial_memory assert isinstance(result, list), "Should return historical data" # Memory increase should be reasonable (less than 100MB for 90 days) assert memory_increase < 100, f"Memory usage increased too much: {memory_increase:.1f}MB" print(f"✅ Memory usage test - {len(result)} records, +{memory_increase:.1f}MB") async def test_caching_behavior(self, aemet_client, madrid_coords): """Test caching behavior and performance improvement""" lat, lon = madrid_coords # First request (cold) start_time = datetime.now() result1 = await aemet_client.get_current_weather(lat, lon) first_call_time = (datetime.now() - start_time).total_seconds() * 1000 # Second request (potentially cached) start_time = datetime.now() result2 = await aemet_client.get_current_weather(lat, lon) second_call_time = (datetime.now() - start_time).total_seconds() * 1000 assert result1 is not None, "First call should succeed" assert result2 is not None, "Second call should succeed" # Both should return valid data assert 'temperature' in result1, "First result should have temperature" assert 'temperature' in result2, "Second result should have temperature" print(f"✅ Caching test - First call: {first_call_time:.0f}ms, Second call: {second_call_time:.0f}ms") class TestAEMETIntegrationScenarios: """Test realistic integration scenarios""" async def test_daily_weather_workflow(self, aemet_client, madrid_coords): """Test a complete daily weather workflow""" lat, lon = madrid_coords # Simulate a daily weather check workflow workflow_results = {} # Step 1: Get current conditions current = await aemet_client.get_current_weather(lat, lon) workflow_results['current'] = current assert current is not None, "Should get current weather" # Step 2: Get today's forecast forecast = await aemet_client.get_forecast(lat, lon, 1) workflow_results['forecast'] = forecast assert len(forecast) == 1, "Should get today's forecast" # Step 3: Get week ahead forecast week_forecast = await aemet_client.get_forecast(lat, lon, 7) workflow_results['week_forecast'] = week_forecast assert len(week_forecast) == 7, "Should get 7-day forecast" # Step 4: Get last week's actual weather for comparison end_date = datetime.now() - timedelta(days=1) start_date = end_date - timedelta(days=7) historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date) workflow_results['historical'] = historical assert isinstance(historical, list), "Should get historical data" # Validate workflow consistency all_sources = set() if current: all_sources.add(current['source']) if forecast: all_sources.add(forecast[0]['source']) if week_forecast: all_sources.add(week_forecast[0]['source']) if historical: all_sources.update([h['source'] for h in historical]) print(f"✅ Daily workflow test - Sources used: {', '.join(all_sources)}") return workflow_results async def test_weather_alerting_scenario(self, aemet_client, madrid_coords): """Test weather alerting scenario""" lat, lon = madrid_coords # Get forecast for potential alerts forecast = await aemet_client.get_forecast(lat, lon, 3) alerts = [] for day in forecast: # Check for extreme temperatures if day['temperature'] > 35: alerts.append(f"High temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}") elif day['temperature'] < -5: alerts.append(f"Low temperature alert: {day['temperature']}°C on {day['forecast_date'].date()}") # Check for high precipitation if day['precipitation'] > 20: alerts.append(f"Heavy rain alert: {day['precipitation']}mm on {day['forecast_date'].date()}") # Alerts should be properly formatted for alert in alerts: assert isinstance(alert, str), "Alert should be string" assert "alert" in alert.lower(), "Alert should contain 'alert'" print(f"✅ Weather alerting test - {len(alerts)} alerts generated") return alerts async def test_historical_analysis_scenario(self, aemet_client, madrid_coords): """Test historical weather analysis scenario""" lat, lon = madrid_coords # Get historical data for analysis end_date = datetime.now() start_date = end_date - timedelta(days=30) historical = await aemet_client.get_historical_weather(lat, lon, start_date, end_date) if historical: # Calculate statistics temperatures = [h['temperature'] for h in historical if h['temperature'] is not None] precipitations = [h['precipitation'] for h in historical if h['precipitation'] is not None] if temperatures: avg_temp = sum(temperatures) / len(temperatures) max_temp = max(temperatures) min_temp = min(temperatures) # Validate statistics assert min_temp <= avg_temp <= max_temp, "Temperature statistics should be logical" assert -20 <= min_temp <= 50, "Min temperature should be reasonable" assert -20 <= max_temp <= 50, "Max temperature should be reasonable" if precipitations: total_precip = sum(precipitations) rainy_days = len([p for p in precipitations if p > 0.1]) # Validate precipitation statistics assert total_precip >= 0, "Total precipitation should be non-negative" assert 0 <= rainy_days <= len(precipitations), "Rainy days should be reasonable" print(f"✅ Historical analysis test - {len(historical)} records analyzed") return { 'record_count': len(historical), 'avg_temp': avg_temp if temperatures else None, 'temp_range': (min_temp, max_temp) if temperatures else None, 'total_precip': total_precip if precipitations else None, 'rainy_days': rainy_days if precipitations else None } return {} class TestAEMETRegressionTests: """Regression tests for previously fixed issues""" async def test_timezone_handling_regression(self, aemet_client, madrid_coords): """Regression test for timezone handling issues""" lat, lon = madrid_coords # Get current weather and forecast current = await aemet_client.get_current_weather(lat, lon) forecast = await aemet_client.get_forecast(lat, lon, 2) if current: # Current weather date should be recent (within last hour) now = datetime.now() time_diff = abs((now - current['date']).total_seconds()) assert time_diff < 3600, "Current weather timestamp should be recent" if forecast: # Forecast dates should be in the future now = datetime.now().date() for day in forecast: forecast_date = day['forecast_date'].date() assert forecast_date >= now, f"Forecast date {forecast_date} should be today or future" async def test_data_type_conversion_regression(self, weather_parser): """Regression test for data type conversion issues""" # Test cases that previously caused issues test_cases = [ ("25.5", 25.5), # String to float (25, 25.0), # Int to float ("", None), # Empty string ("invalid", None), # Invalid string (None, None), # None input ] for input_val, expected in test_cases: result = weather_parser.safe_float(input_val, None) if expected is None: assert result is None, f"Expected None for input {input_val}, got {result}" else: assert result == expected, f"Expected {expected} for input {input_val}, got {result}" def test_empty_data_handling_regression(self, weather_parser): """Regression test for empty data handling""" # Empty lists and dictionaries should be handled gracefully empty_data_cases = [ [], [{}], [{"invalid": "data"}], None, ] for empty_data in empty_data_cases: result = weather_parser.parse_historical_data(empty_data if empty_data is not None else []) assert isinstance(result, list), f"Should return list for empty data: {empty_data}" # May be empty or have some synthetic data, but should not crash # ================================================================ # STANDALONE TEST RUNNER FOR EDGE CASES # ================================================================ async def run_edge_case_tests(): """Run edge case tests manually""" print("="*60) print("AEMET EDGE CASE TESTS") print("="*60) client = AEMETClient() parser = WeatherDataParser() generator = SyntheticWeatherGenerator() madrid_coords = (40.4168, -3.7038) print(f"\n1. Testing extreme coordinates...") extreme_result = await client.get_current_weather(90, 180) print(f" Extreme coords result: {extreme_result['source']} source") print(f"\n2. Testing parser edge cases...") parser_tests = [ parser.safe_float(None, 10.0), parser.safe_float("invalid", 5.0), parser.extract_temperature_value([]), ] print(f" Parser edge cases passed: {len(parser_tests)}") print(f"\n3. Testing synthetic generator extremes...") large_forecast = generator.generate_forecast_sync(100) print(f" Generated {len(large_forecast)} forecast days") print(f"\n4. Testing concurrent requests...") tasks = [client.get_current_weather(*madrid_coords) for _ in range(5)] concurrent_results = await asyncio.gather(*tasks, return_exceptions=True) successful = len([r for r in concurrent_results if isinstance(r, dict)]) print(f" Concurrent requests: {successful}/5 successful") print(f"\n✅ Edge case tests completed!") if __name__ == "__main__": asyncio.run(run_edge_case_tests())