+
{suite_name.title()} Tests ({result["status"].upper()})
+
Duration: {result["duration"]:.2f}s
+
Return Code: {result["return_code"]}
+
+ {f'
Output:
{result["stdout"][:1000]}{"..." if len(result["stdout"]) > 1000 else ""}' if result["stdout"] else ""}
+ {f'
Errors:
{result["stderr"][:1000]}{"..." if len(result["stderr"]) > 1000 else ""}' if result["stderr"] else ""}
+
+ """
+
+ # Fill template
+ html_content = html_template.format(
+ timestamp=report["test_run_summary"]["timestamp"],
+ total_suites=report["test_run_summary"]["total_suites"],
+ passed_suites=report["test_run_summary"]["passed_suites"],
+ failed_suites=report["test_run_summary"]["failed_suites"],
+ timeout_suites=report["test_run_summary"]["timeout_suites"],
+ success_rate=report["test_run_summary"]["success_rate"],
+ duration=report["test_run_summary"]["total_duration_seconds"],
+ recommendations_html=recommendations_html,
+ suite_results_html=suite_results_html
+ )
+
+ # Save HTML report
+ html_file = self.results_dir / "test_report.html"
+ with open(html_file, 'w') as f:
+ f.write(html_content)
+
+ logger.info(f"HTML report saved to: {html_file}")
+
+ def print_test_summary(self, report: Dict[str, Any]):
+ """Print test summary to console"""
+ summary = report["test_run_summary"]
+
+ print("\n" + "=" * 80)
+ print("TRAINING SERVICE TEST RESULTS SUMMARY")
+ print("=" * 80)
+ print(f"Timestamp: {summary['timestamp']}")
+ print(f"Total Suites: {summary['total_suites']}")
+ print(f"Passed: {summary['passed_suites']}")
+ print(f"Failed: {summary['failed_suites']}")
+ print(f"Errors: {summary['error_suites']}")
+ print(f"Timeouts: {summary['timeout_suites']}")
+ print(f"Success Rate: {summary['success_rate']:.1f}%")
+ print(f"Total Duration: {summary['total_duration_seconds']:.2f}s")
+
+ print("\nSUITE DETAILS:")
+ print("-" * 50)
+ for suite_name, result in report["suite_results"].items():
+ status_icon = "β
" if result["status"] == "passed" else "β"
+ print(f"{status_icon} {suite_name.ljust(15)}: {result['status'].upper().ljust(10)} ({result['duration']:.2f}s)")
+
+ print("\nRECOMMENDATIONS:")
+ print("-" * 50)
+ for i, rec in enumerate(report["recommendations"], 1):
+ print(f"{i}. {rec}")
+
+ print("\nFILES GENERATED:")
+ print("-" * 50)
+ print(f"π JSON Report: {self.results_dir}/test_report.json")
+ print(f"π HTML Report: {self.results_dir}/test_report.html")
+ print(f"π Coverage Reports: {self.results_dir}/coverage_*_html/")
+ print(f"π JUnit XML: {self.results_dir}/junit_*.xml")
+ print("=" * 80)
+
+ async def run_all_tests(self):
+ """Run all test suites"""
+ logger.info("Starting comprehensive test run...")
+
+ # Setup environment
+ await self.setup_test_environment()
+
+ # Run each test suite
+ for suite_name in self.test_suites.keys():
+ logger.info(f"Starting {suite_name} test suite...")
+ result = self.run_test_suite(suite_name)
+ self.test_results[suite_name] = result
+
+ if result["status"] == "passed":
+ logger.info(f"β
{suite_name} tests PASSED ({result['duration']:.2f}s)")
+ elif result["status"] == "failed":
+ logger.error(f"β {suite_name} tests FAILED ({result['duration']:.2f}s)")
+ elif result["status"] == "timeout":
+ logger.error(f"β° {suite_name} tests TIMED OUT ({result['duration']:.2f}s)")
+ else:
+ logger.error(f"π₯ {suite_name} tests ERROR ({result['duration']:.2f}s)")
+
+ # Generate final report
+ report = self.generate_test_report()
+
+ return report
+
+ def run_specific_suite(self, suite_name: str):
+ """Run a specific test suite"""
+ if suite_name not in self.test_suites:
+ logger.error(f"Unknown test suite: {suite_name}")
+ logger.info(f"Available suites: {', '.join(self.test_suites.keys())}")
+ return None
+
+ logger.info(f"Running {suite_name} test suite only...")
+ result = self.run_test_suite(suite_name)
+ self.test_results[suite_name] = result
+
+ # Generate report for single suite
+ report = self.generate_test_report()
+ return report
+
+
+# ================================================================
+# MAIN EXECUTION
+# ================================================================
+
+async def main():
+ """Main execution function"""
+ import argparse
+
+ parser = argparse.ArgumentParser(description="Training Service Test Runner")
+ parser.add_argument(
+ "--suite",
+ choices=list(TrainingTestRunner().test_suites.keys()) + ["all"],
+ default="all",
+ help="Test suite to run (default: all)"
+ )
+ parser.add_argument(
+ "--verbose", "-v",
+ action="store_true",
+ help="Verbose output"
+ )
+ parser.add_argument(
+ "--quick",
+ action="store_true",
+ help="Run quick tests only (skip performance tests)"
+ )
+
+ args = parser.parse_args()
+
+ # Setup logging level
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ # Create test runner
+ runner = TrainingTestRunner()
+
+ # Modify test suites for quick run
+ if args.quick:
+ # Skip performance tests in quick mode
+ if "performance" in runner.test_suites:
+ del runner.test_suites["performance"]
+ logger.info("Quick mode: Skipping performance tests")
+
+ try:
+ if args.suite == "all":
+ report = await runner.run_all_tests()
+ else:
+ report = runner.run_specific_suite(args.suite)
+
+ # Exit with appropriate code
+ if report and report["test_run_summary"]["failed_suites"] == 0 and report["test_run_summary"]["error_suites"] == 0:
+ logger.info("All tests completed successfully!")
+ sys.exit(0)
+ else:
+ logger.error("Some tests failed!")
+ sys.exit(1)
+
+ except KeyboardInterrupt:
+ logger.info("Test run interrupted by user")
+ sys.exit(130)
+ except Exception as e:
+ logger.error(f"Test run failed with error: {e}")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ # Handle both direct execution and pytest discovery
+ if len(sys.argv) > 1 and sys.argv[1] in ["--suite", "-h", "--help"]:
+ # Running as main script with arguments
+ asyncio.run(main())
+ else:
+ # Running as pytest discovery or direct execution without args
+ print("Training Service Test Runner")
+ print("=" * 50)
+ print("Usage:")
+ print(" python run_tests.py --suite all # Run all test suites")
+ print(" python run_tests.py --suite unit # Run unit tests only")
+ print(" python run_tests.py --suite integration # Run integration tests only")
+ print(" python run_tests.py --suite performance # Run performance tests only")
+ print(" python run_tests.py --quick # Run quick tests (skip performance)")
+ print(" python run_tests.py -v # Verbose output")
+ print()
+ print("Available test suites:")
+ runner = TrainingTestRunner()
+ for suite_name, config in runner.test_suites.items():
+ print(f" {suite_name.ljust(15)}: {config['description']}")
+ print()
+
+ # If no arguments provided, run all tests
+ if len(sys.argv) == 1:
+ print("No arguments provided. Running all tests...")
+ asyncio.run(TrainingTestRunner().run_all_tests())
\ No newline at end of file
diff --git a/services/training/tests/test_end_to_end.py b/services/training/tests/test_end_to_end.py
new file mode 100644
index 00000000..65c75712
--- /dev/null
+++ b/services/training/tests/test_end_to_end.py
@@ -0,0 +1,311 @@
+# ================================================================
+# services/training/tests/test_end_to_end.py
+# ================================================================
+"""
+End-to-End Testing for Training Service
+Tests complete workflows from API to ML pipeline to results
+"""
+
+import pytest
+import asyncio
+import httpx
+import pandas as pd
+import json
+import tempfile
+import time
+from datetime import datetime, timedelta
+from typing import Dict, List, Any
+from unittest.mock import patch, AsyncMock
+import uuid
+
+from app.main import app
+from app.schemas.training import TrainingJobRequest, SingleProductTrainingRequest
+
+
+class TestTrainingServiceEndToEnd:
+ """End-to-end tests for complete training workflows"""
+
+ @pytest.fixture
+ async def test_client(self):
+ """Create test client for the training service"""
+ from httpx import AsyncClient
+ async with AsyncClient(app=app, base_url="http://test") as client:
+ yield client
+
+ @pytest.fixture
+ def real_bakery_data(self):
+ """Use the actual bakery sales data from the uploaded CSV"""
+ # This fixture would load the real bakery_sales_2023_2024.csv data
+ # For testing, we'll simulate the structure based on the document description
+
+ # Generate realistic data matching the CSV structure
+ start_date = datetime(2023, 1, 1)
+ dates = [start_date + timedelta(days=i) for i in range(365)]
+
+ products = [
+ "Pan Integral", "Pan Blanco", "Croissant", "Magdalenas",
+ "Empanadas", "Tarta Chocolate", "Roscon Reyes", "Palmeras"
+ ]
+
+ data = []
+ for date in dates:
+ for product in products:
+ # Realistic sales patterns for Madrid bakery
+ base_quantity = {
+ "Pan Integral": 80, "Pan Blanco": 120, "Croissant": 45,
+ "Magdalenas": 30, "Empanadas": 25, "Tarta Chocolate": 15,
+ "Roscon Reyes": 8, "Palmeras": 12
+ }.get(product, 20)
+
+ # Seasonal variations
+ if date.month == 12 and product == "Roscon Reyes":
+ base_quantity *= 5 # Christmas specialty
+ elif date.month in [6, 7, 8]: # Summer
+ base_quantity *= 0.85
+ elif date.month in [11, 12, 1]: # Winter
+ base_quantity *= 1.15
+
+ # Weekly patterns
+ if date.weekday() >= 5: # Weekends
+ base_quantity *= 1.3
+ elif date.weekday() == 0: # Monday slower
+ base_quantity *= 0.8
+
+ # Weather influence
+ temp = 15 + 12 * np.sin((date.timetuple().tm_yday / 365) * 2 * np.pi)
+ if temp > 30: # Very hot days
+ if product in ["Pan Integral", "Pan Blanco"]:
+ base_quantity *= 0.7
+ elif temp < 5: # Cold days
+ base_quantity *= 1.1
+
+ # Add realistic noise
+ import numpy as np
+ quantity = max(1, int(base_quantity + np.random.normal(0, base_quantity * 0.15)))
+
+ # Calculate revenue (realistic Spanish bakery prices)
+ price_per_unit = {
+ "Pan Integral": 2.80, "Pan Blanco": 2.50, "Croissant": 1.50,
+ "Magdalenas": 1.20, "Empanadas": 3.50, "Tarta Chocolate": 18.00,
+ "Roscon Reyes": 25.00, "Palmeras": 1.80
+ }.get(product, 2.00)
+
+ revenue = round(quantity * price_per_unit, 2)
+
+ data.append({
+ "date": date.strftime("%Y-%m-%d"),
+ "product": product,
+ "quantity": quantity,
+ "revenue": revenue,
+ "temperature": round(temp + np.random.normal(0, 3), 1),
+ "precipitation": max(0, np.random.exponential(0.8)),
+ "is_weekend": date.weekday() >= 5,
+ "is_holiday": self._is_spanish_holiday(date)
+ })
+
+ return pd.DataFrame(data)
+
+ def _is_spanish_holiday(self, date: datetime) -> bool:
+ """Check if date is a Spanish holiday"""
+ spanish_holidays = [
+ (1, 1), # AΓ±o Nuevo
+ (1, 6), # Reyes Magos
+ (5, 1), # DΓa del Trabajo
+ (8, 15), # AsunciΓ³n de la Virgen
+ (10, 12), # Fiesta Nacional de EspaΓ±a
+ (11, 1), # Todos los Santos
+ (12, 6), # DΓa de la ConstituciΓ³n
+ (12, 8), # Inmaculada ConcepciΓ³n
+ (12, 25), # Navidad
+ ]
+ return (date.month, date.day) in spanish_holidays
+
+ @pytest.fixture
+ async def mock_external_apis(self):
+ """Mock external APIs (AEMET and Madrid OpenData)"""
+ with patch('app.external.aemet.AEMETClient') as mock_aemet, \
+ patch('app.external.madrid_opendata.MadridOpenDataClient') as mock_madrid:
+
+ # Mock AEMET weather data
+ mock_aemet_instance = AsyncMock()
+ mock_aemet.return_value = mock_aemet_instance
+
+ # Generate realistic Madrid weather data
+ weather_data = []
+ for i in range(365):
+ date = datetime(2023, 1, 1) + timedelta(days=i)
+ day_of_year = date.timetuple().tm_yday
+ # Madrid climate: hot summers, mild winters
+ base_temp = 14 + 12 * np.sin((day_of_year / 365) * 2 * np.pi)
+
+ weather_data.append({
+ "date": date,
+ "temperature": round(base_temp + np.random.normal(0, 4), 1),
+ "precipitation": max(0, np.random.exponential(1.2)),
+ "humidity": np.random.uniform(25, 75),
+ "wind_speed": np.random.uniform(3, 20),
+ "pressure": np.random.uniform(995, 1025),
+ "description": np.random.choice([
+ "Soleado", "Parcialmente nublado", "Nublado",
+ "Lluvia ligera", "Despejado"
+ ]),
+ "source": "aemet"
+ })
+
+ mock_aemet_instance.get_historical_weather.return_value = weather_data
+ mock_aemet_instance.get_current_weather.return_value = weather_data[-1]
+
+ # Mock Madrid traffic data
+ mock_madrid_instance = AsyncMock()
+ mock_madrid.return_value = mock_madrid_instance
+
+ traffic_data = []
+ for i in range(365):
+ date = datetime(2023, 1, 1) + timedelta(days=i)
+
+ # Multiple measurements per day
+ for hour in range(6, 22, 2): # Every 2 hours from 6 AM to 10 PM
+ measurement_time = date.replace(hour=hour)
+
+ # Realistic Madrid traffic patterns
+ if hour in [7, 8, 9, 18, 19, 20]: # Rush hours
+ volume = np.random.randint(1200, 2000)
+ congestion = "high"
+ speed = np.random.randint(10, 25)
+ elif hour in [12, 13, 14]: # Lunch time
+ volume = np.random.randint(800, 1200)
+ congestion = "medium"
+ speed = np.random.randint(20, 35)
+ else: # Off-peak
+ volume = np.random.randint(300, 800)
+ congestion = "low"
+ speed = np.random.randint(30, 50)
+
+ traffic_data.append({
+ "date": measurement_time,
+ "traffic_volume": volume,
+ "occupation_percentage": np.random.randint(15, 85),
+ "load_percentage": np.random.randint(25, 90),
+ "average_speed": speed,
+ "congestion_level": congestion,
+ "pedestrian_count": np.random.randint(100, 800),
+ "measurement_point_id": "MADRID_CENTER_001",
+ "measurement_point_name": "Puerta del Sol",
+ "road_type": "URB",
+ "source": "madrid_opendata"
+ })
+
+ mock_madrid_instance.get_historical_traffic.return_value = traffic_data
+ mock_madrid_instance.get_current_traffic.return_value = traffic_data[-1]
+
+ yield {
+ 'aemet': mock_aemet_instance,
+ 'madrid': mock_madrid_instance
+ }
+
+ @pytest.mark.asyncio
+ async def test_complete_training_workflow_api(
+ self,
+ test_client,
+ real_bakery_data,
+ mock_external_apis
+ ):
+ """Test complete training workflow through API endpoints"""
+
+ # Step 1: Check service health
+ health_response = await test_client.get("/health")
+ assert health_response.status_code == 200
+ health_data = health_response.json()
+ assert health_data["status"] == "healthy"
+
+ # Step 2: Validate training data quality
+ with patch('app.services.training_service.TrainingService._fetch_sales_data',
+ return_value=real_bakery_data):
+
+ validation_response = await test_client.post(
+ "/training/validate",
+ json={
+ "tenant_id": "test_bakery_001",
+ "include_weather": True,
+ "include_traffic": True
+ }
+ )
+
+ assert validation_response.status_code == 200
+ validation_data = validation_response.json()
+ assert validation_data["is_valid"] is True
+ assert validation_data["data_points"] > 1000 # Sufficient data
+ assert validation_data["missing_percentage"] < 10
+
+ # Step 3: Start training job for multiple products
+ training_request = {
+ "products": ["Pan Integral", "Croissant", "Magdalenas"],
+ "include_weather": True,
+ "include_traffic": True,
+ "config": {
+ "seasonality_mode": "additive",
+ "changepoint_prior_scale": 0.05,
+ "seasonality_prior_scale": 10.0,
+ "validation_enabled": True
+ }
+ }
+
+ with patch('app.services.training_service.TrainingService._fetch_sales_data',
+ return_value=real_bakery_data):
+
+ start_response = await test_client.post(
+ "/training/jobs",
+ json=training_request,
+ headers={"X-Tenant-ID": "test_bakery_001"}
+ )
+
+ assert start_response.status_code == 201
+ job_data = start_response.json()
+ job_id = job_data["job_id"]
+ assert job_data["status"] == "pending"
+
+ # Step 4: Monitor job progress
+ max_wait_time = 300 # 5 minutes
+ start_time = time.time()
+
+ while time.time() - start_time < max_wait_time:
+ status_response = await test_client.get(f"/training/jobs/{job_id}/status")
+ assert status_response.status_code == 200
+
+ status_data = status_response.json()
+
+ if status_data["status"] == "completed":
+ # Training completed successfully
+ assert "models_trained" in status_data
+ assert len(status_data["models_trained"]) == 3 # Three products
+
+ # Check model quality
+ for model_info in status_data["models_trained"]:
+ assert "product_name" in model_info
+ assert "model_id" in model_info
+ assert "metrics" in model_info
+
+ metrics = model_info["metrics"]
+ assert "mape" in metrics
+ assert "rmse" in metrics
+ assert "mae" in metrics
+
+ # Quality thresholds for bakery data
+ assert metrics["mape"] < 50, f"MAPE too high for {model_info['product_name']}: {metrics['mape']}"
+ assert metrics["rmse"] > 0
+
+ break
+ elif status_data["status"] == "failed":
+ pytest.fail(f"Training job failed: {status_data.get('error_message', 'Unknown error')}")
+
+ # Wait before checking again
+ await asyncio.sleep(10)
+ else:
+ pytest.fail(f"Training job did not complete within {max_wait_time} seconds")
+
+ # Step 5: Get detailed job logs
+ logs_response = await test_client.get(f"/training/jobs/{job_id}/logs")
+ assert logs_response.status_code == 200
+ logs_data = logs_response.json()
+ assert "logs" in logs_data
+ assert len(logs_data["logs"]) > 0
\ No newline at end of file
diff --git a/services/training/tests/test_ml_pipeline_integration.py b/services/training/tests/test_ml_pipeline_integration.py
new file mode 100644
index 00000000..e69de29b
diff --git a/services/training/tests/test_performance.py b/services/training/tests/test_performance.py
new file mode 100644
index 00000000..a7ba60bf
--- /dev/null
+++ b/services/training/tests/test_performance.py
@@ -0,0 +1,630 @@
+# ================================================================
+# services/training/tests/test_performance.py
+# ================================================================
+"""
+Performance and Load Testing for Training Service
+Tests training performance with real-world data volumes
+"""
+
+import pytest
+import asyncio
+import pandas as pd
+import numpy as np
+import time
+from datetime import datetime, timedelta
+from concurrent.futures import ThreadPoolExecutor
+import psutil
+import gc
+from typing import List, Dict, Any
+import logging
+
+from app.ml.trainer import BakeryMLTrainer
+from app.ml.data_processor import BakeryDataProcessor
+from app.services.training_service import TrainingService
+
+
+class TestTrainingPerformance:
+ """Performance tests for training service components"""
+
+ @pytest.fixture
+ def large_sales_dataset(self):
+ """Generate large dataset for performance testing (2 years of data)"""
+ start_date = datetime(2022, 1, 1)
+ end_date = datetime(2024, 1, 1)
+
+ date_range = pd.date_range(start=start_date, end=end_date, freq='D')
+ products = [
+ "Pan Integral", "Pan Blanco", "Croissant", "Magdalenas",
+ "Empanadas", "Tarta Chocolate", "Roscon Reyes", "Palmeras",
+ "Donuts", "Berlinas", "Napolitanas", "Ensaimadas"
+ ]
+
+ data = []
+ for date in date_range:
+ for product in products:
+ # Realistic sales simulation
+ base_quantity = np.random.randint(5, 150)
+
+ # Seasonal patterns
+ if date.month in [12, 1]: # Winter/Holiday season
+ base_quantity *= 1.4
+ elif date.month in [6, 7, 8]: # Summer
+ base_quantity *= 0.8
+
+ # Weekly patterns
+ if date.weekday() >= 5: # Weekends
+ base_quantity *= 1.2
+ elif date.weekday() == 0: # Monday
+ base_quantity *= 0.7
+
+ # Add noise
+ quantity = max(1, int(base_quantity + np.random.normal(0, base_quantity * 0.1)))
+
+ data.append({
+ "date": date.strftime("%Y-%m-%d"),
+ "product": product,
+ "quantity": quantity,
+ "revenue": round(quantity * np.random.uniform(1.5, 8.0), 2),
+ "temperature": round(15 + 12 * np.sin((date.timetuple().tm_yday / 365) * 2 * np.pi) + np.random.normal(0, 3), 1),
+ "precipitation": max(0, np.random.exponential(0.8)),
+ "is_weekend": date.weekday() >= 5,
+ "is_holiday": self._is_spanish_holiday(date)
+ })
+
+ return pd.DataFrame(data)
+
+ def _is_spanish_holiday(self, date: datetime) -> bool:
+ """Check if date is a Spanish holiday"""
+ holidays = [
+ (1, 1), # New Year
+ (1, 6), # Epiphany
+ (5, 1), # Labor Day
+ (8, 15), # Assumption
+ (10, 12), # National Day
+ (11, 1), # All Saints
+ (12, 6), # Constitution Day
+ (12, 8), # Immaculate Conception
+ (12, 25), # Christmas
+ ]
+ return (date.month, date.day) in holidays
+
+ @pytest.mark.asyncio
+ async def test_single_product_training_performance(self, large_sales_dataset):
+ """Test performance of single product training with large dataset"""
+
+ trainer = BakeryMLTrainer()
+ product_data = large_sales_dataset[large_sales_dataset['product'] == 'Pan Integral'].copy()
+
+ # Measure memory before training
+ process = psutil.Process()
+ memory_before = process.memory_info().rss / 1024 / 1024 # MB
+
+ start_time = time.time()
+
+ result = await trainer.train_single_product(
+ tenant_id="perf_test_tenant",
+ product_name="Pan Integral",
+ sales_data=product_data,
+ config={
+ "include_weather": True,
+ "include_traffic": False, # Skip traffic for performance
+ "seasonality_mode": "additive"
+ }
+ )
+
+ end_time = time.time()
+ training_duration = end_time - start_time
+
+ # Measure memory after training
+ memory_after = process.memory_info().rss / 1024 / 1024 # MB
+ memory_used = memory_after - memory_before
+
+ # Performance assertions
+ assert training_duration < 120, f"Training took too long: {training_duration:.2f}s"
+ assert memory_used < 500, f"Memory usage too high: {memory_used:.2f}MB"
+ assert result['status'] == 'completed'
+
+ # Quality assertions
+ metrics = result['metrics']
+ assert metrics['mape'] < 50, f"MAPE too high: {metrics['mape']:.2f}%"
+
+ print(f"Performance Results:")
+ print(f" Training Duration: {training_duration:.2f}s")
+ print(f" Memory Used: {memory_used:.2f}MB")
+ print(f" Data Points: {len(product_data)}")
+ print(f" MAPE: {metrics['mape']:.2f}%")
+ print(f" RMSE: {metrics['rmse']:.2f}")
+
+ @pytest.mark.asyncio
+ async def test_concurrent_training_performance(self, large_sales_dataset):
+ """Test performance of concurrent training jobs"""
+
+ trainer = BakeryMLTrainer()
+ products = ["Pan Integral", "Croissant", "Magdalenas"]
+
+ async def train_product(product_name: str):
+ """Train a single product"""
+ product_data = large_sales_dataset[large_sales_dataset['product'] == product_name].copy()
+
+ start_time = time.time()
+ result = await trainer.train_single_product(
+ tenant_id=f"concurrent_test_{product_name.replace(' ', '_').lower()}",
+ product_name=product_name,
+ sales_data=product_data,
+ config={"include_weather": True, "include_traffic": False}
+ )
+ end_time = time.time()
+
+ return {
+ 'product': product_name,
+ 'duration': end_time - start_time,
+ 'status': result['status'],
+ 'metrics': result.get('metrics', {})
+ }
+
+ # Run concurrent training
+ start_time = time.time()
+ tasks = [train_product(product) for product in products]
+ results = await asyncio.gather(*tasks)
+ total_time = time.time() - start_time
+
+ # Verify all trainings completed
+ for result in results:
+ assert result['status'] == 'completed'
+ assert result['duration'] < 120 # Individual training time
+
+ # Concurrent execution should be faster than sequential
+ sequential_time_estimate = sum(r['duration'] for r in results)
+ efficiency = sequential_time_estimate / total_time
+
+ assert efficiency > 1.5, f"Concurrency efficiency too low: {efficiency:.2f}x"
+
+ print(f"Concurrent Training Results:")
+ print(f" Total Time: {total_time:.2f}s")
+ print(f" Sequential Estimate: {sequential_time_estimate:.2f}s")
+ print(f" Efficiency: {efficiency:.2f}x")
+
+ for result in results:
+ print(f" {result['product']}: {result['duration']:.2f}s, MAPE: {result['metrics'].get('mape', 'N/A'):.2f}%")
+
+ @pytest.mark.asyncio
+ async def test_data_processing_scalability(self, large_sales_dataset):
+ """Test data processing performance with increasing data sizes"""
+
+ data_processor = BakeryDataProcessor()
+
+ # Test with different data sizes
+ data_sizes = [1000, 5000, 10000, 20000, len(large_sales_dataset)]
+ performance_results = []
+
+ for size in data_sizes:
+ # Take a sample of the specified size
+ sample_data = large_sales_dataset.head(size).copy()
+
+ start_time = time.time()
+
+ # Process the data
+ processed_data = await data_processor.prepare_training_data(
+ sales_data=sample_data,
+ include_weather=True,
+ include_traffic=True,
+ tenant_id="scalability_test",
+ product_name="Pan Integral"
+ )
+
+ processing_time = time.time() - start_time
+
+ performance_results.append({
+ 'data_size': size,
+ 'processing_time': processing_time,
+ 'processed_rows': len(processed_data),
+ 'throughput': size / processing_time if processing_time > 0 else 0
+ })
+
+ # Verify linear or sub-linear scaling
+ for i in range(1, len(performance_results)):
+ prev_result = performance_results[i-1]
+ curr_result = performance_results[i]
+
+ size_ratio = curr_result['data_size'] / prev_result['data_size']
+ time_ratio = curr_result['processing_time'] / prev_result['processing_time']
+
+ # Processing time should scale better than linearly
+ assert time_ratio < size_ratio * 1.5, f"Poor scaling at size {curr_result['data_size']}"
+
+ print("Data Processing Scalability Results:")
+ for result in performance_results:
+ print(f" Size: {result['data_size']:,} rows, Time: {result['processing_time']:.2f}s, "
+ f"Throughput: {result['throughput']:.0f} rows/s")
+
+ @pytest.mark.asyncio
+ async def test_memory_usage_optimization(self, large_sales_dataset):
+ """Test memory usage optimization during training"""
+
+ trainer = BakeryMLTrainer()
+ process = psutil.Process()
+
+ # Baseline memory
+ gc.collect() # Force garbage collection
+ baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
+
+ memory_snapshots = [{'stage': 'baseline', 'memory_mb': baseline_memory}]
+
+ # Load data
+ product_data = large_sales_dataset[large_sales_dataset['product'] == 'Pan Integral'].copy()
+ current_memory = process.memory_info().rss / 1024 / 1024
+ memory_snapshots.append({'stage': 'data_loaded', 'memory_mb': current_memory})
+
+ # Train model
+ result = await trainer.train_single_product(
+ tenant_id="memory_test_tenant",
+ product_name="Pan Integral",
+ sales_data=product_data,
+ config={"include_weather": True, "include_traffic": True}
+ )
+
+ current_memory = process.memory_info().rss / 1024 / 1024
+ memory_snapshots.append({'stage': 'model_trained', 'memory_mb': current_memory})
+
+ # Cleanup
+ del product_data
+ del result
+ gc.collect()
+
+ final_memory = process.memory_info().rss / 1024 / 1024
+ memory_snapshots.append({'stage': 'cleanup', 'memory_mb': final_memory})
+
+ # Memory assertions
+ peak_memory = max(snapshot['memory_mb'] for snapshot in memory_snapshots)
+ memory_increase = peak_memory - baseline_memory
+ memory_after_cleanup = final_memory - baseline_memory
+
+ assert memory_increase < 800, f"Peak memory increase too high: {memory_increase:.2f}MB"
+ assert memory_after_cleanup < 100, f"Memory not properly cleaned up: {memory_after_cleanup:.2f}MB"
+
+ print("Memory Usage Analysis:")
+ for snapshot in memory_snapshots:
+ print(f" {snapshot['stage']}: {snapshot['memory_mb']:.2f}MB")
+ print(f" Peak increase: {memory_increase:.2f}MB")
+ print(f" After cleanup: {memory_after_cleanup:.2f}MB")
+
+ @pytest.mark.asyncio
+ async def test_training_service_throughput(self, large_sales_dataset):
+ """Test training service throughput with multiple requests"""
+
+ training_service = TrainingService()
+
+ # Simulate multiple training requests
+ num_requests = 5
+ products = ["Pan Integral", "Croissant", "Magdalenas", "Empanadas", "Tarta Chocolate"]
+
+ async def execute_training_request(request_id: int, product: str):
+ """Execute a single training request"""
+ product_data = large_sales_dataset[large_sales_dataset['product'] == product].copy()
+
+ with patch.object(training_service, '_fetch_sales_data', return_value=product_data):
+ start_time = time.time()
+
+ result = await training_service.execute_training_job(
+ db=None, # Mock DB session
+ tenant_id=f"throughput_test_tenant_{request_id}",
+ job_id=f"job_{request_id}_{product.replace(' ', '_').lower()}",
+ request={
+ 'products': [product],
+ 'include_weather': True,
+ 'include_traffic': False,
+ 'config': {'seasonality_mode': 'additive'}
+ }
+ )
+
+ duration = time.time() - start_time
+ return {
+ 'request_id': request_id,
+ 'product': product,
+ 'duration': duration,
+ 'status': result.get('status', 'unknown'),
+ 'models_trained': len(result.get('models_trained', []))
+ }
+
+ # Execute requests concurrently
+ start_time = time.time()
+ tasks = [
+ execute_training_request(i, products[i % len(products)])
+ for i in range(num_requests)
+ ]
+ results = await asyncio.gather(*tasks)
+ total_time = time.time() - start_time
+
+ # Calculate throughput metrics
+ successful_requests = sum(1 for r in results if r['status'] == 'completed')
+ throughput = successful_requests / total_time # requests per second
+
+ # Performance assertions
+ assert successful_requests >= num_requests * 0.8, "Too many failed requests"
+ assert throughput >= 0.1, f"Throughput too low: {throughput:.3f} req/s"
+ assert total_time < 300, f"Total time too long: {total_time:.2f}s"
+
+ print(f"Training Service Throughput Results:")
+ print(f" Total Requests: {num_requests}")
+ print(f" Successful: {successful_requests}")
+ print(f" Total Time: {total_time:.2f}s")
+ print(f" Throughput: {throughput:.3f} req/s")
+ print(f" Average Request Time: {total_time/num_requests:.2f}s")
+
+ @pytest.mark.asyncio
+ async def test_large_dataset_edge_cases(self, large_sales_dataset):
+ """Test handling of edge cases with large datasets"""
+
+ data_processor = BakeryDataProcessor()
+
+ # Test 1: Dataset with many missing values
+ corrupted_data = large_sales_dataset.copy()
+ # Introduce 30% missing values randomly
+ mask = np.random.random(len(corrupted_data)) < 0.3
+ corrupted_data.loc[mask, 'quantity'] = np.nan
+
+ start_time = time.time()
+ result = await data_processor.validate_data_quality(corrupted_data)
+ validation_time = time.time() - start_time
+
+ assert validation_time < 10, f"Validation too slow: {validation_time:.2f}s"
+ assert result['is_valid'] is False
+ assert 'high_missing_data' in result['issues']
+
+ # Test 2: Dataset with extreme outliers
+ outlier_data = large_sales_dataset.copy()
+ # Add extreme outliers (100x normal values)
+ outlier_indices = np.random.choice(len(outlier_data), size=int(len(outlier_data) * 0.01), replace=False)
+ outlier_data.loc[outlier_indices, 'quantity'] *= 100
+
+ start_time = time.time()
+ cleaned_data = await data_processor.clean_outliers(outlier_data)
+ cleaning_time = time.time() - start_time
+
+ assert cleaning_time < 15, f"Outlier cleaning too slow: {cleaning_time:.2f}s"
+ assert len(cleaned_data) > len(outlier_data) * 0.95 # Should retain most data
+
+ # Test 3: Very sparse data (many products with few sales)
+ sparse_data = large_sales_dataset.copy()
+ # Keep only 10% of data for each product randomly
+ sparse_data = sparse_data.groupby('product').apply(
+ lambda x: x.sample(n=max(1, int(len(x) * 0.1)))
+ ).reset_index(drop=True)
+
+ start_time = time.time()
+ validation_result = await data_processor.validate_data_quality(sparse_data)
+ sparse_validation_time = time.time() - start_time
+
+ assert sparse_validation_time < 5, f"Sparse data validation too slow: {sparse_validation_time:.2f}s"
+
+ print("Edge Case Performance Results:")
+ print(f" Corrupted data validation: {validation_time:.2f}s")
+ print(f" Outlier cleaning: {cleaning_time:.2f}s")
+ print(f" Sparse data validation: {sparse_validation_time:.2f}s")
+
+
+class TestTrainingServiceLoad:
+ """Load testing for training service under stress"""
+
+ @pytest.mark.asyncio
+ async def test_sustained_load_training(self, large_sales_dataset):
+ """Test training service under sustained load"""
+
+ trainer = BakeryMLTrainer()
+
+ # Define load test parameters
+ duration_minutes = 2 # Run for 2 minutes
+ requests_per_minute = 3
+
+ products = ["Pan Integral", "Croissant", "Magdalenas"]
+
+ async def sustained_training_worker(worker_id: int, duration: float):
+ """Worker that continuously submits training requests"""
+ start_time = time.time()
+ completed_requests = 0
+ failed_requests = 0
+
+ while time.time() - start_time < duration:
+ try:
+ product = products[completed_requests % len(products)]
+ product_data = large_sales_dataset[
+ large_sales_dataset['product'] == product
+ ].copy()
+
+ result = await trainer.train_single_product(
+ tenant_id=f"load_test_worker_{worker_id}",
+ product_name=product,
+ sales_data=product_data,
+ config={"include_weather": False, "include_traffic": False} # Minimal config for speed
+ )
+
+ if result['status'] == 'completed':
+ completed_requests += 1
+ else:
+ failed_requests += 1
+
+ except Exception as e:
+ failed_requests += 1
+ logging.error(f"Training request failed: {e}")
+
+ # Wait before next request
+ await asyncio.sleep(60 / requests_per_minute)
+
+ return {
+ 'worker_id': worker_id,
+ 'completed': completed_requests,
+ 'failed': failed_requests,
+ 'duration': time.time() - start_time
+ }
+
+ # Start multiple workers
+ num_workers = 2
+ duration_seconds = duration_minutes * 60
+
+ start_time = time.time()
+ tasks = [
+ sustained_training_worker(i, duration_seconds)
+ for i in range(num_workers)
+ ]
+ results = await asyncio.gather(*tasks)
+ total_time = time.time() - start_time
+
+ # Analyze results
+ total_completed = sum(r['completed'] for r in results)
+ total_failed = sum(r['failed'] for r in results)
+ success_rate = total_completed / (total_completed + total_failed) if (total_completed + total_failed) > 0 else 0
+
+ # Performance assertions
+ assert success_rate >= 0.8, f"Success rate too low: {success_rate:.2%}"
+ assert total_completed >= duration_minutes * requests_per_minute * num_workers * 0.7, "Throughput too low"
+
+ print(f"Sustained Load Test Results:")
+ print(f" Duration: {total_time:.2f}s")
+ print(f" Workers: {num_workers}")
+ print(f" Completed Requests: {total_completed}")
+ print(f" Failed Requests: {total_failed}")
+ print(f" Success Rate: {success_rate:.2%}")
+ print(f" Average Throughput: {total_completed/total_time:.2f} req/s")
+
+ @pytest.mark.asyncio
+ async def test_resource_exhaustion_recovery(self, large_sales_dataset):
+ """Test service recovery from resource exhaustion"""
+
+ trainer = BakeryMLTrainer()
+
+ # Simulate resource exhaustion by running many concurrent requests
+ num_concurrent = 10 # High concurrency to stress the system
+
+ async def resource_intensive_task(task_id: int):
+ """Task designed to consume resources"""
+ try:
+ # Use all products to increase memory usage
+ all_products_data = large_sales_dataset.copy()
+
+ result = await trainer.train_tenant_models(
+ tenant_id=f"resource_test_{task_id}",
+ sales_data=all_products_data,
+ config={
+ "train_all_products": True,
+ "include_weather": True,
+ "include_traffic": True
+ }
+ )
+
+ return {'task_id': task_id, 'status': 'completed', 'error': None}
+
+ except Exception as e:
+ return {'task_id': task_id, 'status': 'failed', 'error': str(e)}
+
+ # Launch all tasks simultaneously
+ start_time = time.time()
+ tasks = [resource_intensive_task(i) for i in range(num_concurrent)]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+ duration = time.time() - start_time
+
+ # Analyze results
+ completed = sum(1 for r in results if isinstance(r, dict) and r['status'] == 'completed')
+ failed = sum(1 for r in results if isinstance(r, dict) and r['status'] == 'failed')
+ exceptions = sum(1 for r in results if isinstance(r, Exception))
+
+ # The system should handle some failures gracefully
+ # but should complete at least some requests
+ total_processed = completed + failed + exceptions
+ processing_rate = total_processed / num_concurrent
+
+ assert processing_rate >= 0.5, f"Too many requests not processed: {processing_rate:.2%}"
+ assert duration < 600, f"Recovery took too long: {duration:.2f}s" # 10 minutes max
+
+ print(f"Resource Exhaustion Test Results:")
+ print(f" Concurrent Requests: {num_concurrent}")
+ print(f" Completed: {completed}")
+ print(f" Failed: {failed}")
+ print(f" Exceptions: {exceptions}")
+ print(f" Duration: {duration:.2f}s")
+ print(f" Processing Rate: {processing_rate:.2%}")
+
+
+# ================================================================
+# BENCHMARK UTILITIES
+# ================================================================
+
+class PerformanceBenchmark:
+ """Utility class for performance benchmarking"""
+
+ @staticmethod
+ def measure_execution_time(func):
+ """Decorator to measure execution time"""
+ async def wrapper(*args, **kwargs):
+ start_time = time.time()
+ result = await func(*args, **kwargs)
+ execution_time = time.time() - start_time
+
+ if hasattr(result, 'update') and isinstance(result, dict):
+ result['execution_time'] = execution_time
+
+ return result
+ return wrapper
+
+ @staticmethod
+ def memory_profiler(func):
+ """Decorator to profile memory usage"""
+ async def wrapper(*args, **kwargs):
+ process = psutil.Process()
+
+ # Memory before
+ gc.collect()
+ memory_before = process.memory_info().rss / 1024 / 1024
+
+ result = await func(*args, **kwargs)
+
+ # Memory after
+ memory_after = process.memory_info().rss / 1024 / 1024
+ memory_used = memory_after - memory_before
+
+ if hasattr(result, 'update') and isinstance(result, dict):
+ result['memory_used_mb'] = memory_used
+
+ return result
+ return wrapper
+
+
+# ================================================================
+# STANDALONE EXECUTION
+# ================================================================
+
+if __name__ == "__main__":
+ """
+ Run performance tests as standalone script
+ Usage: python test_performance.py
+ """
+ import sys
+ import os
+ from unittest.mock import patch
+
+ # Add the training service root to Python path
+ training_service_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ sys.path.insert(0, training_service_root)
+
+ print("=" * 60)
+ print("TRAINING SERVICE PERFORMANCE TEST SUITE")
+ print("=" * 60)
+
+ # Setup logging
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
+
+ # Run performance tests
+ pytest.main([
+ __file__,
+ "-v",
+ "--tb=short",
+ "-s", # Don't capture output
+ "--durations=10", # Show 10 slowest tests
+ "-m", "not slow", # Skip slow tests unless specifically requested
+ ])
+
+ print("\n" + "=" * 60)
+ print("PERFORMANCE TESTING COMPLETE")
+ print("=" * 60)
\ No newline at end of file