630 lines
25 KiB
Python
630 lines
25 KiB
Python
# ================================================================
|
|
# services/training/tests/test_performance.py
|
|
# ================================================================
|
|
"""
|
|
Performance and Load Testing for Training Service
|
|
Tests training performance with real-world data volumes
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import pandas as pd
|
|
import numpy as np
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import psutil
|
|
import gc
|
|
from typing import List, Dict, Any
|
|
import logging
|
|
|
|
from app.ml.trainer import BakeryMLTrainer
|
|
from app.ml.data_processor import BakeryDataProcessor
|
|
from app.services.training_service import TrainingService
|
|
|
|
|
|
class TestTrainingPerformance:
|
|
"""Performance tests for training service components"""
|
|
|
|
@pytest.fixture
|
|
def large_sales_dataset(self):
|
|
"""Generate large dataset for performance testing (2 years of data)"""
|
|
start_date = datetime(2022, 1, 1)
|
|
end_date = datetime(2024, 1, 1)
|
|
|
|
date_range = pd.date_range(start=start_date, end=end_date, freq='D')
|
|
products = [
|
|
"Pan Integral", "Pan Blanco", "Croissant", "Magdalenas",
|
|
"Empanadas", "Tarta Chocolate", "Roscon Reyes", "Palmeras",
|
|
"Donuts", "Berlinas", "Napolitanas", "Ensaimadas"
|
|
]
|
|
|
|
data = []
|
|
for date in date_range:
|
|
for product in products:
|
|
# Realistic sales simulation
|
|
base_quantity = np.random.randint(5, 150)
|
|
|
|
# Seasonal patterns
|
|
if date.month in [12, 1]: # Winter/Holiday season
|
|
base_quantity *= 1.4
|
|
elif date.month in [6, 7, 8]: # Summer
|
|
base_quantity *= 0.8
|
|
|
|
# Weekly patterns
|
|
if date.weekday() >= 5: # Weekends
|
|
base_quantity *= 1.2
|
|
elif date.weekday() == 0: # Monday
|
|
base_quantity *= 0.7
|
|
|
|
# Add noise
|
|
quantity = max(1, int(base_quantity + np.random.normal(0, base_quantity * 0.1)))
|
|
|
|
data.append({
|
|
"date": date.strftime("%Y-%m-%d"),
|
|
"product": product,
|
|
"quantity": quantity,
|
|
"revenue": round(quantity * np.random.uniform(1.5, 8.0), 2),
|
|
"temperature": round(15 + 12 * np.sin((date.timetuple().tm_yday / 365) * 2 * np.pi) + np.random.normal(0, 3), 1),
|
|
"precipitation": max(0, np.random.exponential(0.8)),
|
|
"is_weekend": date.weekday() >= 5,
|
|
"is_holiday": self._is_spanish_holiday(date)
|
|
})
|
|
|
|
return pd.DataFrame(data)
|
|
|
|
def _is_spanish_holiday(self, date: datetime) -> bool:
|
|
"""Check if date is a Spanish holiday"""
|
|
holidays = [
|
|
(1, 1), # New Year
|
|
(1, 6), # Epiphany
|
|
(5, 1), # Labor Day
|
|
(8, 15), # Assumption
|
|
(10, 12), # National Day
|
|
(11, 1), # All Saints
|
|
(12, 6), # Constitution Day
|
|
(12, 8), # Immaculate Conception
|
|
(12, 25), # Christmas
|
|
]
|
|
return (date.month, date.day) in holidays
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_single_product_training_performance(self, large_sales_dataset):
|
|
"""Test performance of single product training with large dataset"""
|
|
|
|
trainer = BakeryMLTrainer()
|
|
product_data = large_sales_dataset[large_sales_dataset['product'] == 'Pan Integral'].copy()
|
|
|
|
# Measure memory before training
|
|
process = psutil.Process()
|
|
memory_before = process.memory_info().rss / 1024 / 1024 # MB
|
|
|
|
start_time = time.time()
|
|
|
|
result = await trainer.train_single_product(
|
|
tenant_id="perf_test_tenant",
|
|
product_name="Pan Integral",
|
|
sales_data=product_data,
|
|
config={
|
|
"include_weather": True,
|
|
"include_traffic": False, # Skip traffic for performance
|
|
"seasonality_mode": "additive"
|
|
}
|
|
)
|
|
|
|
end_time = time.time()
|
|
training_duration = end_time - start_time
|
|
|
|
# Measure memory after training
|
|
memory_after = process.memory_info().rss / 1024 / 1024 # MB
|
|
memory_used = memory_after - memory_before
|
|
|
|
# Performance assertions
|
|
assert training_duration < 120, f"Training took too long: {training_duration:.2f}s"
|
|
assert memory_used < 500, f"Memory usage too high: {memory_used:.2f}MB"
|
|
assert result['status'] == 'completed'
|
|
|
|
# Quality assertions
|
|
metrics = result['metrics']
|
|
assert metrics['mape'] < 50, f"MAPE too high: {metrics['mape']:.2f}%"
|
|
|
|
print(f"Performance Results:")
|
|
print(f" Training Duration: {training_duration:.2f}s")
|
|
print(f" Memory Used: {memory_used:.2f}MB")
|
|
print(f" Data Points: {len(product_data)}")
|
|
print(f" MAPE: {metrics['mape']:.2f}%")
|
|
print(f" RMSE: {metrics['rmse']:.2f}")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_training_performance(self, large_sales_dataset):
|
|
"""Test performance of concurrent training jobs"""
|
|
|
|
trainer = BakeryMLTrainer()
|
|
products = ["Pan Integral", "Croissant", "Magdalenas"]
|
|
|
|
async def train_product(product_name: str):
|
|
"""Train a single product"""
|
|
product_data = large_sales_dataset[large_sales_dataset['product'] == product_name].copy()
|
|
|
|
start_time = time.time()
|
|
result = await trainer.train_single_product(
|
|
tenant_id=f"concurrent_test_{product_name.replace(' ', '_').lower()}",
|
|
product_name=product_name,
|
|
sales_data=product_data,
|
|
config={"include_weather": True, "include_traffic": False}
|
|
)
|
|
end_time = time.time()
|
|
|
|
return {
|
|
'product': product_name,
|
|
'duration': end_time - start_time,
|
|
'status': result['status'],
|
|
'metrics': result.get('metrics', {})
|
|
}
|
|
|
|
# Run concurrent training
|
|
start_time = time.time()
|
|
tasks = [train_product(product) for product in products]
|
|
results = await asyncio.gather(*tasks)
|
|
total_time = time.time() - start_time
|
|
|
|
# Verify all trainings completed
|
|
for result in results:
|
|
assert result['status'] == 'completed'
|
|
assert result['duration'] < 120 # Individual training time
|
|
|
|
# Concurrent execution should be faster than sequential
|
|
sequential_time_estimate = sum(r['duration'] for r in results)
|
|
efficiency = sequential_time_estimate / total_time
|
|
|
|
assert efficiency > 1.5, f"Concurrency efficiency too low: {efficiency:.2f}x"
|
|
|
|
print(f"Concurrent Training Results:")
|
|
print(f" Total Time: {total_time:.2f}s")
|
|
print(f" Sequential Estimate: {sequential_time_estimate:.2f}s")
|
|
print(f" Efficiency: {efficiency:.2f}x")
|
|
|
|
for result in results:
|
|
print(f" {result['product']}: {result['duration']:.2f}s, MAPE: {result['metrics'].get('mape', 'N/A'):.2f}%")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_processing_scalability(self, large_sales_dataset):
|
|
"""Test data processing performance with increasing data sizes"""
|
|
|
|
data_processor = BakeryDataProcessor()
|
|
|
|
# Test with different data sizes
|
|
data_sizes = [1000, 5000, 10000, 20000, len(large_sales_dataset)]
|
|
performance_results = []
|
|
|
|
for size in data_sizes:
|
|
# Take a sample of the specified size
|
|
sample_data = large_sales_dataset.head(size).copy()
|
|
|
|
start_time = time.time()
|
|
|
|
# Process the data
|
|
processed_data = await data_processor.prepare_training_data(
|
|
sales_data=sample_data,
|
|
include_weather=True,
|
|
include_traffic=True,
|
|
tenant_id="scalability_test",
|
|
product_name="Pan Integral"
|
|
)
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
performance_results.append({
|
|
'data_size': size,
|
|
'processing_time': processing_time,
|
|
'processed_rows': len(processed_data),
|
|
'throughput': size / processing_time if processing_time > 0 else 0
|
|
})
|
|
|
|
# Verify linear or sub-linear scaling
|
|
for i in range(1, len(performance_results)):
|
|
prev_result = performance_results[i-1]
|
|
curr_result = performance_results[i]
|
|
|
|
size_ratio = curr_result['data_size'] / prev_result['data_size']
|
|
time_ratio = curr_result['processing_time'] / prev_result['processing_time']
|
|
|
|
# Processing time should scale better than linearly
|
|
assert time_ratio < size_ratio * 1.5, f"Poor scaling at size {curr_result['data_size']}"
|
|
|
|
print("Data Processing Scalability Results:")
|
|
for result in performance_results:
|
|
print(f" Size: {result['data_size']:,} rows, Time: {result['processing_time']:.2f}s, "
|
|
f"Throughput: {result['throughput']:.0f} rows/s")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_memory_usage_optimization(self, large_sales_dataset):
|
|
"""Test memory usage optimization during training"""
|
|
|
|
trainer = BakeryMLTrainer()
|
|
process = psutil.Process()
|
|
|
|
# Baseline memory
|
|
gc.collect() # Force garbage collection
|
|
baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
|
|
|
|
memory_snapshots = [{'stage': 'baseline', 'memory_mb': baseline_memory}]
|
|
|
|
# Load data
|
|
product_data = large_sales_dataset[large_sales_dataset['product'] == 'Pan Integral'].copy()
|
|
current_memory = process.memory_info().rss / 1024 / 1024
|
|
memory_snapshots.append({'stage': 'data_loaded', 'memory_mb': current_memory})
|
|
|
|
# Train model
|
|
result = await trainer.train_single_product(
|
|
tenant_id="memory_test_tenant",
|
|
product_name="Pan Integral",
|
|
sales_data=product_data,
|
|
config={"include_weather": True, "include_traffic": True}
|
|
)
|
|
|
|
current_memory = process.memory_info().rss / 1024 / 1024
|
|
memory_snapshots.append({'stage': 'model_trained', 'memory_mb': current_memory})
|
|
|
|
# Cleanup
|
|
del product_data
|
|
del result
|
|
gc.collect()
|
|
|
|
final_memory = process.memory_info().rss / 1024 / 1024
|
|
memory_snapshots.append({'stage': 'cleanup', 'memory_mb': final_memory})
|
|
|
|
# Memory assertions
|
|
peak_memory = max(snapshot['memory_mb'] for snapshot in memory_snapshots)
|
|
memory_increase = peak_memory - baseline_memory
|
|
memory_after_cleanup = final_memory - baseline_memory
|
|
|
|
assert memory_increase < 800, f"Peak memory increase too high: {memory_increase:.2f}MB"
|
|
assert memory_after_cleanup < 100, f"Memory not properly cleaned up: {memory_after_cleanup:.2f}MB"
|
|
|
|
print("Memory Usage Analysis:")
|
|
for snapshot in memory_snapshots:
|
|
print(f" {snapshot['stage']}: {snapshot['memory_mb']:.2f}MB")
|
|
print(f" Peak increase: {memory_increase:.2f}MB")
|
|
print(f" After cleanup: {memory_after_cleanup:.2f}MB")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_training_service_throughput(self, large_sales_dataset):
|
|
"""Test training service throughput with multiple requests"""
|
|
|
|
training_service = TrainingService()
|
|
|
|
# Simulate multiple training requests
|
|
num_requests = 5
|
|
products = ["Pan Integral", "Croissant", "Magdalenas", "Empanadas", "Tarta Chocolate"]
|
|
|
|
async def execute_training_request(request_id: int, product: str):
|
|
"""Execute a single training request"""
|
|
product_data = large_sales_dataset[large_sales_dataset['product'] == product].copy()
|
|
|
|
with patch.object(training_service, '_fetch_sales_data', return_value=product_data):
|
|
start_time = time.time()
|
|
|
|
result = await training_service.execute_training_job(
|
|
db=None, # Mock DB session
|
|
tenant_id=f"throughput_test_tenant_{request_id}",
|
|
job_id=f"job_{request_id}_{product.replace(' ', '_').lower()}",
|
|
request={
|
|
'products': [product],
|
|
'include_weather': True,
|
|
'include_traffic': False,
|
|
'config': {'seasonality_mode': 'additive'}
|
|
}
|
|
)
|
|
|
|
duration = time.time() - start_time
|
|
return {
|
|
'request_id': request_id,
|
|
'product': product,
|
|
'duration': duration,
|
|
'status': result.get('status', 'unknown'),
|
|
'models_trained': len(result.get('models_trained', []))
|
|
}
|
|
|
|
# Execute requests concurrently
|
|
start_time = time.time()
|
|
tasks = [
|
|
execute_training_request(i, products[i % len(products)])
|
|
for i in range(num_requests)
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
total_time = time.time() - start_time
|
|
|
|
# Calculate throughput metrics
|
|
successful_requests = sum(1 for r in results if r['status'] == 'completed')
|
|
throughput = successful_requests / total_time # requests per second
|
|
|
|
# Performance assertions
|
|
assert successful_requests >= num_requests * 0.8, "Too many failed requests"
|
|
assert throughput >= 0.1, f"Throughput too low: {throughput:.3f} req/s"
|
|
assert total_time < 300, f"Total time too long: {total_time:.2f}s"
|
|
|
|
print(f"Training Service Throughput Results:")
|
|
print(f" Total Requests: {num_requests}")
|
|
print(f" Successful: {successful_requests}")
|
|
print(f" Total Time: {total_time:.2f}s")
|
|
print(f" Throughput: {throughput:.3f} req/s")
|
|
print(f" Average Request Time: {total_time/num_requests:.2f}s")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_large_dataset_edge_cases(self, large_sales_dataset):
|
|
"""Test handling of edge cases with large datasets"""
|
|
|
|
data_processor = BakeryDataProcessor()
|
|
|
|
# Test 1: Dataset with many missing values
|
|
corrupted_data = large_sales_dataset.copy()
|
|
# Introduce 30% missing values randomly
|
|
mask = np.random.random(len(corrupted_data)) < 0.3
|
|
corrupted_data.loc[mask, 'quantity'] = np.nan
|
|
|
|
start_time = time.time()
|
|
result = await data_processor.validate_data_quality(corrupted_data)
|
|
validation_time = time.time() - start_time
|
|
|
|
assert validation_time < 10, f"Validation too slow: {validation_time:.2f}s"
|
|
assert result['is_valid'] is False
|
|
assert 'high_missing_data' in result['issues']
|
|
|
|
# Test 2: Dataset with extreme outliers
|
|
outlier_data = large_sales_dataset.copy()
|
|
# Add extreme outliers (100x normal values)
|
|
outlier_indices = np.random.choice(len(outlier_data), size=int(len(outlier_data) * 0.01), replace=False)
|
|
outlier_data.loc[outlier_indices, 'quantity'] *= 100
|
|
|
|
start_time = time.time()
|
|
cleaned_data = await data_processor.clean_outliers(outlier_data)
|
|
cleaning_time = time.time() - start_time
|
|
|
|
assert cleaning_time < 15, f"Outlier cleaning too slow: {cleaning_time:.2f}s"
|
|
assert len(cleaned_data) > len(outlier_data) * 0.95 # Should retain most data
|
|
|
|
# Test 3: Very sparse data (many products with few sales)
|
|
sparse_data = large_sales_dataset.copy()
|
|
# Keep only 10% of data for each product randomly
|
|
sparse_data = sparse_data.groupby('product').apply(
|
|
lambda x: x.sample(n=max(1, int(len(x) * 0.1)))
|
|
).reset_index(drop=True)
|
|
|
|
start_time = time.time()
|
|
validation_result = await data_processor.validate_data_quality(sparse_data)
|
|
sparse_validation_time = time.time() - start_time
|
|
|
|
assert sparse_validation_time < 5, f"Sparse data validation too slow: {sparse_validation_time:.2f}s"
|
|
|
|
print("Edge Case Performance Results:")
|
|
print(f" Corrupted data validation: {validation_time:.2f}s")
|
|
print(f" Outlier cleaning: {cleaning_time:.2f}s")
|
|
print(f" Sparse data validation: {sparse_validation_time:.2f}s")
|
|
|
|
|
|
class TestTrainingServiceLoad:
|
|
"""Load testing for training service under stress"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sustained_load_training(self, large_sales_dataset):
|
|
"""Test training service under sustained load"""
|
|
|
|
trainer = BakeryMLTrainer()
|
|
|
|
# Define load test parameters
|
|
duration_minutes = 2 # Run for 2 minutes
|
|
requests_per_minute = 3
|
|
|
|
products = ["Pan Integral", "Croissant", "Magdalenas"]
|
|
|
|
async def sustained_training_worker(worker_id: int, duration: float):
|
|
"""Worker that continuously submits training requests"""
|
|
start_time = time.time()
|
|
completed_requests = 0
|
|
failed_requests = 0
|
|
|
|
while time.time() - start_time < duration:
|
|
try:
|
|
product = products[completed_requests % len(products)]
|
|
product_data = large_sales_dataset[
|
|
large_sales_dataset['product'] == product
|
|
].copy()
|
|
|
|
result = await trainer.train_single_product(
|
|
tenant_id=f"load_test_worker_{worker_id}",
|
|
product_name=product,
|
|
sales_data=product_data,
|
|
config={"include_weather": False, "include_traffic": False} # Minimal config for speed
|
|
)
|
|
|
|
if result['status'] == 'completed':
|
|
completed_requests += 1
|
|
else:
|
|
failed_requests += 1
|
|
|
|
except Exception as e:
|
|
failed_requests += 1
|
|
logging.error(f"Training request failed: {e}")
|
|
|
|
# Wait before next request
|
|
await asyncio.sleep(60 / requests_per_minute)
|
|
|
|
return {
|
|
'worker_id': worker_id,
|
|
'completed': completed_requests,
|
|
'failed': failed_requests,
|
|
'duration': time.time() - start_time
|
|
}
|
|
|
|
# Start multiple workers
|
|
num_workers = 2
|
|
duration_seconds = duration_minutes * 60
|
|
|
|
start_time = time.time()
|
|
tasks = [
|
|
sustained_training_worker(i, duration_seconds)
|
|
for i in range(num_workers)
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
total_time = time.time() - start_time
|
|
|
|
# Analyze results
|
|
total_completed = sum(r['completed'] for r in results)
|
|
total_failed = sum(r['failed'] for r in results)
|
|
success_rate = total_completed / (total_completed + total_failed) if (total_completed + total_failed) > 0 else 0
|
|
|
|
# Performance assertions
|
|
assert success_rate >= 0.8, f"Success rate too low: {success_rate:.2%}"
|
|
assert total_completed >= duration_minutes * requests_per_minute * num_workers * 0.7, "Throughput too low"
|
|
|
|
print(f"Sustained Load Test Results:")
|
|
print(f" Duration: {total_time:.2f}s")
|
|
print(f" Workers: {num_workers}")
|
|
print(f" Completed Requests: {total_completed}")
|
|
print(f" Failed Requests: {total_failed}")
|
|
print(f" Success Rate: {success_rate:.2%}")
|
|
print(f" Average Throughput: {total_completed/total_time:.2f} req/s")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resource_exhaustion_recovery(self, large_sales_dataset):
|
|
"""Test service recovery from resource exhaustion"""
|
|
|
|
trainer = BakeryMLTrainer()
|
|
|
|
# Simulate resource exhaustion by running many concurrent requests
|
|
num_concurrent = 10 # High concurrency to stress the system
|
|
|
|
async def resource_intensive_task(task_id: int):
|
|
"""Task designed to consume resources"""
|
|
try:
|
|
# Use all products to increase memory usage
|
|
all_products_data = large_sales_dataset.copy()
|
|
|
|
result = await trainer.train_tenant_models(
|
|
tenant_id=f"resource_test_{task_id}",
|
|
sales_data=all_products_data,
|
|
config={
|
|
"train_all_products": True,
|
|
"include_weather": True,
|
|
"include_traffic": True
|
|
}
|
|
)
|
|
|
|
return {'task_id': task_id, 'status': 'completed', 'error': None}
|
|
|
|
except Exception as e:
|
|
return {'task_id': task_id, 'status': 'failed', 'error': str(e)}
|
|
|
|
# Launch all tasks simultaneously
|
|
start_time = time.time()
|
|
tasks = [resource_intensive_task(i) for i in range(num_concurrent)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
duration = time.time() - start_time
|
|
|
|
# Analyze results
|
|
completed = sum(1 for r in results if isinstance(r, dict) and r['status'] == 'completed')
|
|
failed = sum(1 for r in results if isinstance(r, dict) and r['status'] == 'failed')
|
|
exceptions = sum(1 for r in results if isinstance(r, Exception))
|
|
|
|
# The system should handle some failures gracefully
|
|
# but should complete at least some requests
|
|
total_processed = completed + failed + exceptions
|
|
processing_rate = total_processed / num_concurrent
|
|
|
|
assert processing_rate >= 0.5, f"Too many requests not processed: {processing_rate:.2%}"
|
|
assert duration < 600, f"Recovery took too long: {duration:.2f}s" # 10 minutes max
|
|
|
|
print(f"Resource Exhaustion Test Results:")
|
|
print(f" Concurrent Requests: {num_concurrent}")
|
|
print(f" Completed: {completed}")
|
|
print(f" Failed: {failed}")
|
|
print(f" Exceptions: {exceptions}")
|
|
print(f" Duration: {duration:.2f}s")
|
|
print(f" Processing Rate: {processing_rate:.2%}")
|
|
|
|
|
|
# ================================================================
|
|
# BENCHMARK UTILITIES
|
|
# ================================================================
|
|
|
|
class PerformanceBenchmark:
|
|
"""Utility class for performance benchmarking"""
|
|
|
|
@staticmethod
|
|
def measure_execution_time(func):
|
|
"""Decorator to measure execution time"""
|
|
async def wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
result = await func(*args, **kwargs)
|
|
execution_time = time.time() - start_time
|
|
|
|
if hasattr(result, 'update') and isinstance(result, dict):
|
|
result['execution_time'] = execution_time
|
|
|
|
return result
|
|
return wrapper
|
|
|
|
@staticmethod
|
|
def memory_profiler(func):
|
|
"""Decorator to profile memory usage"""
|
|
async def wrapper(*args, **kwargs):
|
|
process = psutil.Process()
|
|
|
|
# Memory before
|
|
gc.collect()
|
|
memory_before = process.memory_info().rss / 1024 / 1024
|
|
|
|
result = await func(*args, **kwargs)
|
|
|
|
# Memory after
|
|
memory_after = process.memory_info().rss / 1024 / 1024
|
|
memory_used = memory_after - memory_before
|
|
|
|
if hasattr(result, 'update') and isinstance(result, dict):
|
|
result['memory_used_mb'] = memory_used
|
|
|
|
return result
|
|
return wrapper
|
|
|
|
|
|
# ================================================================
|
|
# STANDALONE EXECUTION
|
|
# ================================================================
|
|
|
|
if __name__ == "__main__":
|
|
"""
|
|
Run performance tests as standalone script
|
|
Usage: python test_performance.py
|
|
"""
|
|
import sys
|
|
import os
|
|
from unittest.mock import patch
|
|
|
|
# Add the training service root to Python path
|
|
training_service_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, training_service_root)
|
|
|
|
print("=" * 60)
|
|
print("TRAINING SERVICE PERFORMANCE TEST SUITE")
|
|
print("=" * 60)
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Run performance tests
|
|
pytest.main([
|
|
__file__,
|
|
"-v",
|
|
"--tb=short",
|
|
"-s", # Don't capture output
|
|
"--durations=10", # Show 10 slowest tests
|
|
"-m", "not slow", # Skip slow tests unless specifically requested
|
|
])
|
|
|
|
print("\n" + "=" * 60)
|
|
print("PERFORMANCE TESTING COMPLETE")
|
|
print("=" * 60) |