Files
bakery-ia/services/training/tests/run_tests.py

673 lines
26 KiB
Python
Raw Normal View History

# ================================================================
# services/training/tests/run_tests.py
# ================================================================
"""
Main test runner script for Training Service
Executes comprehensive test suite and generates reports
"""
import os
import sys
import asyncio
import subprocess
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TrainingTestRunner:
"""Main test runner for training service"""
def __init__(self):
self.test_dir = Path(__file__).parent
self.results_dir = self.test_dir / "results"
self.results_dir.mkdir(exist_ok=True)
# Test configuration
self.test_suites = {
"unit": {
"files": ["test_api.py", "test_ml.py", "test_service.py"],
"description": "Unit tests for individual components",
"timeout": 300 # 5 minutes
},
"integration": {
"files": ["test_ml_pipeline_integration.py"],
"description": "Integration tests for ML pipeline with external data",
"timeout": 600 # 10 minutes
},
"performance": {
"files": ["test_performance.py"],
"description": "Performance and load testing",
"timeout": 900 # 15 minutes
},
"end_to_end": {
"files": ["test_end_to_end.py"],
"description": "End-to-end workflow testing",
"timeout": 800 # 13 minutes
}
}
self.test_results = {}
async def setup_test_environment(self):
"""Setup test environment and dependencies"""
logger.info("Setting up test environment...")
# Check if we're running in Docker
if os.path.exists("/.dockerenv"):
logger.info("Running in Docker environment")
else:
logger.info("Running in local environment")
# Verify required files exist
required_files = [
"conftest.py",
"test_ml_pipeline_integration.py",
"test_performance.py"
]
for file in required_files:
file_path = self.test_dir / file
if not file_path.exists():
logger.warning(f"Required test file missing: {file}")
# Create test data if needed
await self.create_test_data()
# Verify external services (mock or real)
await self.verify_external_services()
async def create_test_data(self):
"""Create or verify test data exists"""
logger.info("Creating/verifying test data...")
test_data_dir = self.test_dir / "fixtures" / "test_data"
test_data_dir.mkdir(parents=True, exist_ok=True)
# Create bakery sales sample if it doesn't exist
sales_file = test_data_dir / "bakery_sales_sample.csv"
if not sales_file.exists():
logger.info("Creating sample sales data...")
await self.generate_sample_sales_data(sales_file)
# Create weather data sample
weather_file = test_data_dir / "madrid_weather_sample.json"
if not weather_file.exists():
logger.info("Creating sample weather data...")
await self.generate_sample_weather_data(weather_file)
# Create traffic data sample
traffic_file = test_data_dir / "madrid_traffic_sample.json"
if not traffic_file.exists():
logger.info("Creating sample traffic data...")
await self.generate_sample_traffic_data(traffic_file)
async def generate_sample_sales_data(self, file_path: Path):
"""Generate sample sales data for testing"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Generate 6 months of sample data
start_date = datetime(2023, 6, 1)
dates = [start_date + timedelta(days=i) for i in range(180)]
products = ["Pan Integral", "Croissant", "Magdalenas", "Empanadas", "Tarta Chocolate"]
data = []
for date in dates:
for product in products:
base_quantity = np.random.randint(10, 100)
# Weekend boost
if date.weekday() >= 5:
base_quantity *= 1.2
# Seasonal variation
temp = 15 + 10 * np.sin((date.timetuple().tm_yday / 365) * 2 * np.pi)
data.append({
"date": date.strftime("%Y-%m-%d"),
"product": product,
"quantity": int(base_quantity),
"revenue": round(base_quantity * np.random.uniform(2.5, 8.0), 2),
"temperature": round(temp + np.random.normal(0, 3), 1),
"precipitation": max(0, np.random.exponential(0.5)),
"is_weekend": date.weekday() >= 5,
"is_holiday": False
})
df = pd.DataFrame(data)
df.to_csv(file_path, index=False)
logger.info(f"Created sample sales data: {len(df)} records")
async def generate_sample_weather_data(self, file_path: Path):
"""Generate sample weather data"""
import json
from datetime import datetime, timedelta
import numpy as np
start_date = datetime(2023, 6, 1)
weather_data = []
for i in range(180):
date = start_date + timedelta(days=i)
day_of_year = date.timetuple().tm_yday
base_temp = 14 + 12 * np.sin((day_of_year / 365) * 2 * np.pi)
weather_data.append({
"date": date.isoformat(),
"temperature": round(base_temp + np.random.normal(0, 5), 1),
"precipitation": max(0, np.random.exponential(1.0)),
"humidity": np.random.uniform(30, 80),
"wind_speed": np.random.uniform(5, 25),
"pressure": np.random.uniform(1000, 1025),
"description": np.random.choice(["Soleado", "Nuboso", "Lluvioso"]),
"source": "aemet_test"
})
with open(file_path, 'w') as f:
json.dump(weather_data, f, indent=2)
logger.info(f"Created sample weather data: {len(weather_data)} records")
async def generate_sample_traffic_data(self, file_path: Path):
"""Generate sample traffic data"""
import json
from datetime import datetime, timedelta
import numpy as np
start_date = datetime(2023, 6, 1)
traffic_data = []
for i in range(180):
date = start_date + timedelta(days=i)
for hour in [8, 12, 18]: # Three measurements per day
measurement_time = date.replace(hour=hour)
if hour in [8, 18]: # Rush hours
volume = np.random.randint(800, 1500)
congestion = "high"
else: # Lunch time
volume = np.random.randint(400, 800)
congestion = "medium"
traffic_data.append({
"date": measurement_time.isoformat(),
"traffic_volume": volume,
"occupation_percentage": np.random.randint(10, 90),
"load_percentage": np.random.randint(20, 95),
"average_speed": np.random.randint(15, 50),
"congestion_level": congestion,
"pedestrian_count": np.random.randint(50, 500),
"measurement_point_id": "TEST_POINT_001",
"measurement_point_name": "Plaza Mayor",
"road_type": "URB",
"source": "madrid_opendata_test"
})
with open(file_path, 'w') as f:
json.dump(traffic_data, f, indent=2)
logger.info(f"Created sample traffic data: {len(traffic_data)} records")
async def verify_external_services(self):
"""Verify external services are available (mock or real)"""
logger.info("Verifying external services...")
# Check if mock services are available
mock_services = [
("Mock AEMET", "http://localhost:8080/health"),
("Mock Madrid OpenData", "http://localhost:8081/health"),
("Mock Auth Service", "http://localhost:8082/health"),
("Mock Data Service", "http://localhost:8083/health")
]
try:
import httpx
async with httpx.AsyncClient(timeout=5.0) as client:
for service_name, url in mock_services:
try:
response = await client.get(url)
if response.status_code == 200:
logger.info(f"{service_name} is available")
else:
logger.warning(f"{service_name} returned status {response.status_code}")
except Exception as e:
logger.warning(f"{service_name} is not available: {e}")
except ImportError:
logger.warning("httpx not available, skipping service checks")
def run_test_suite(self, suite_name: str) -> Dict[str, Any]:
"""Run a specific test suite"""
suite_config = self.test_suites[suite_name]
logger.info(f"Running {suite_name} test suite: {suite_config['description']}")
start_time = time.time()
# Prepare pytest command
pytest_args = [
"python", "-m", "pytest",
"-v",
"--tb=short",
"--capture=no",
f"--junitxml={self.results_dir}/junit_{suite_name}.xml",
f"--cov=app",
f"--cov-report=html:{self.results_dir}/coverage_{suite_name}_html",
f"--cov-report=xml:{self.results_dir}/coverage_{suite_name}.xml",
"--cov-report=term-missing"
]
# Add test files
for test_file in suite_config["files"]:
test_path = self.test_dir / test_file
if test_path.exists():
pytest_args.append(str(test_path))
else:
logger.warning(f"Test file not found: {test_file}")
# Run the tests
try:
result = subprocess.run(
pytest_args,
cwd=self.test_dir.parent, # Run from training service root
capture_output=True,
text=True,
timeout=suite_config["timeout"]
)
duration = time.time() - start_time
return {
"suite": suite_name,
"status": "passed" if result.returncode == 0 else "failed",
"return_code": result.returncode,
"duration": duration,
"stdout": result.stdout,
"stderr": result.stderr,
"timestamp": datetime.now().isoformat()
}
except subprocess.TimeoutExpired:
duration = time.time() - start_time
logger.error(f"Test suite {suite_name} timed out after {duration:.2f}s")
return {
"suite": suite_name,
"status": "timeout",
"return_code": -1,
"duration": duration,
"stdout": "",
"stderr": f"Test suite timed out after {suite_config['timeout']}s",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
duration = time.time() - start_time
logger.error(f"Error running test suite {suite_name}: {e}")
return {
"suite": suite_name,
"status": "error",
"return_code": -1,
"duration": duration,
"stdout": "",
"stderr": str(e),
"timestamp": datetime.now().isoformat()
}
def generate_test_report(self):
"""Generate comprehensive test report"""
logger.info("Generating test report...")
# Calculate summary statistics
total_suites = len(self.test_results)
passed_suites = sum(1 for r in self.test_results.values() if r["status"] == "passed")
failed_suites = sum(1 for r in self.test_results.values() if r["status"] == "failed")
error_suites = sum(1 for r in self.test_results.values() if r["status"] == "error")
timeout_suites = sum(1 for r in self.test_results.values() if r["status"] == "timeout")
total_duration = sum(r["duration"] for r in self.test_results.values())
# Create detailed report
report = {
"test_run_summary": {
"timestamp": datetime.now().isoformat(),
"total_suites": total_suites,
"passed_suites": passed_suites,
"failed_suites": failed_suites,
"error_suites": error_suites,
"timeout_suites": timeout_suites,
"success_rate": (passed_suites / total_suites * 100) if total_suites > 0 else 0,
"total_duration_seconds": total_duration
},
"suite_results": self.test_results,
"recommendations": self.generate_recommendations()
}
# Save JSON report
report_file = self.results_dir / "test_report.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
# Generate HTML report
self.generate_html_report(report)
# Print summary to console
self.print_test_summary(report)
return report
def generate_recommendations(self) -> List[str]:
"""Generate recommendations based on test results"""
recommendations = []
failed_suites = [name for name, result in self.test_results.items() if result["status"] == "failed"]
timeout_suites = [name for name, result in self.test_results.items() if result["status"] == "timeout"]
if failed_suites:
recommendations.append(f"Failed test suites: {', '.join(failed_suites)}. Check logs for detailed error messages.")
if timeout_suites:
recommendations.append(f"Timeout in suites: {', '.join(timeout_suites)}. Consider increasing timeout or optimizing performance.")
# Performance recommendations
slow_suites = [
name for name, result in self.test_results.items()
if result["duration"] > 300 # 5 minutes
]
if slow_suites:
recommendations.append(f"Slow test suites: {', '.join(slow_suites)}. Consider performance optimization.")
if not recommendations:
recommendations.append("All tests passed successfully! Consider adding more edge case tests.")
return recommendations
def generate_html_report(self, report: Dict[str, Any]):
"""Generate HTML test report"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Training Service Test Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { background-color: #f8f9fa; padding: 20px; border-radius: 5px; }
.summary { display: flex; gap: 20px; margin: 20px 0; }
.metric { background: white; border: 1px solid #dee2e6; padding: 15px; border-radius: 5px; text-align: center; }
.metric-value { font-size: 24px; font-weight: bold; }
.passed { color: #28a745; }
.failed { color: #dc3545; }
.timeout { color: #fd7e14; }
.error { color: #6c757d; }
.suite-result { margin: 20px 0; padding: 15px; border: 1px solid #dee2e6; border-radius: 5px; }
.recommendations { background-color: #e7f3ff; padding: 15px; border-radius: 5px; margin: 20px 0; }
pre { background-color: #f8f9fa; padding: 10px; border-radius: 3px; overflow-x: auto; }
</style>
</head>
<body>
<div class="header">
<h1>Training Service Test Report</h1>
<p>Generated: {timestamp}</p>
</div>
<div class="summary">
<div class="metric">
<div class="metric-value">{total_suites}</div>
<div>Total Suites</div>
</div>
<div class="metric">
<div class="metric-value passed">{passed_suites}</div>
<div>Passed</div>
</div>
<div class="metric">
<div class="metric-value failed">{failed_suites}</div>
<div>Failed</div>
</div>
<div class="metric">
<div class="metric-value timeout">{timeout_suites}</div>
<div>Timeout</div>
</div>
<div class="metric">
<div class="metric-value">{success_rate:.1f}%</div>
<div>Success Rate</div>
</div>
<div class="metric">
<div class="metric-value">{duration:.1f}s</div>
<div>Total Duration</div>
</div>
</div>
<div class="recommendations">
<h3>Recommendations</h3>
<ul>
{recommendations_html}
</ul>
</div>
<h2>Suite Results</h2>
{suite_results_html}
</body>
</html>
"""
# Format recommendations
recommendations_html = '\n'.join(
f"<li>{rec}</li>" for rec in report["recommendations"]
)
# Format suite results
suite_results_html = ""
for suite_name, result in report["suite_results"].items():
status_class = result["status"]
suite_results_html += f"""
<div class="suite-result">
<h3>{suite_name.title()} Tests <span class="{status_class}">({result["status"].upper()})</span></h3>
<p><strong>Duration:</strong> {result["duration"]:.2f}s</p>
<p><strong>Return Code:</strong> {result["return_code"]}</p>
{f'<h4>Output:</h4><pre>{result["stdout"][:1000]}{"..." if len(result["stdout"]) > 1000 else ""}</pre>' if result["stdout"] else ""}
{f'<h4>Errors:</h4><pre>{result["stderr"][:1000]}{"..." if len(result["stderr"]) > 1000 else ""}</pre>' if result["stderr"] else ""}
</div>
"""
# Fill template
html_content = html_template.format(
timestamp=report["test_run_summary"]["timestamp"],
total_suites=report["test_run_summary"]["total_suites"],
passed_suites=report["test_run_summary"]["passed_suites"],
failed_suites=report["test_run_summary"]["failed_suites"],
timeout_suites=report["test_run_summary"]["timeout_suites"],
success_rate=report["test_run_summary"]["success_rate"],
duration=report["test_run_summary"]["total_duration_seconds"],
recommendations_html=recommendations_html,
suite_results_html=suite_results_html
)
# Save HTML report
html_file = self.results_dir / "test_report.html"
with open(html_file, 'w') as f:
f.write(html_content)
logger.info(f"HTML report saved to: {html_file}")
def print_test_summary(self, report: Dict[str, Any]):
"""Print test summary to console"""
summary = report["test_run_summary"]
print("\n" + "=" * 80)
print("TRAINING SERVICE TEST RESULTS SUMMARY")
print("=" * 80)
print(f"Timestamp: {summary['timestamp']}")
print(f"Total Suites: {summary['total_suites']}")
print(f"Passed: {summary['passed_suites']}")
print(f"Failed: {summary['failed_suites']}")
print(f"Errors: {summary['error_suites']}")
print(f"Timeouts: {summary['timeout_suites']}")
print(f"Success Rate: {summary['success_rate']:.1f}%")
print(f"Total Duration: {summary['total_duration_seconds']:.2f}s")
print("\nSUITE DETAILS:")
print("-" * 50)
for suite_name, result in report["suite_results"].items():
status_icon = "" if result["status"] == "passed" else ""
print(f"{status_icon} {suite_name.ljust(15)}: {result['status'].upper().ljust(10)} ({result['duration']:.2f}s)")
print("\nRECOMMENDATIONS:")
print("-" * 50)
for i, rec in enumerate(report["recommendations"], 1):
print(f"{i}. {rec}")
print("\nFILES GENERATED:")
print("-" * 50)
print(f"📄 JSON Report: {self.results_dir}/test_report.json")
print(f"🌐 HTML Report: {self.results_dir}/test_report.html")
print(f"📊 Coverage Reports: {self.results_dir}/coverage_*_html/")
print(f"📋 JUnit XML: {self.results_dir}/junit_*.xml")
print("=" * 80)
async def run_all_tests(self):
"""Run all test suites"""
logger.info("Starting comprehensive test run...")
# Setup environment
await self.setup_test_environment()
# Run each test suite
for suite_name in self.test_suites.keys():
logger.info(f"Starting {suite_name} test suite...")
result = self.run_test_suite(suite_name)
self.test_results[suite_name] = result
if result["status"] == "passed":
logger.info(f"{suite_name} tests PASSED ({result['duration']:.2f}s)")
elif result["status"] == "failed":
logger.error(f"{suite_name} tests FAILED ({result['duration']:.2f}s)")
elif result["status"] == "timeout":
logger.error(f"{suite_name} tests TIMED OUT ({result['duration']:.2f}s)")
else:
logger.error(f"💥 {suite_name} tests ERROR ({result['duration']:.2f}s)")
# Generate final report
report = self.generate_test_report()
return report
def run_specific_suite(self, suite_name: str):
"""Run a specific test suite"""
if suite_name not in self.test_suites:
logger.error(f"Unknown test suite: {suite_name}")
logger.info(f"Available suites: {', '.join(self.test_suites.keys())}")
return None
logger.info(f"Running {suite_name} test suite only...")
result = self.run_test_suite(suite_name)
self.test_results[suite_name] = result
# Generate report for single suite
report = self.generate_test_report()
return report
# ================================================================
# MAIN EXECUTION
# ================================================================
async def main():
"""Main execution function"""
import argparse
parser = argparse.ArgumentParser(description="Training Service Test Runner")
parser.add_argument(
"--suite",
choices=list(TrainingTestRunner().test_suites.keys()) + ["all"],
default="all",
help="Test suite to run (default: all)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output"
)
parser.add_argument(
"--quick",
action="store_true",
help="Run quick tests only (skip performance tests)"
)
args = parser.parse_args()
# Setup logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create test runner
runner = TrainingTestRunner()
# Modify test suites for quick run
if args.quick:
# Skip performance tests in quick mode
if "performance" in runner.test_suites:
del runner.test_suites["performance"]
logger.info("Quick mode: Skipping performance tests")
try:
if args.suite == "all":
report = await runner.run_all_tests()
else:
report = runner.run_specific_suite(args.suite)
# Exit with appropriate code
if report and report["test_run_summary"]["failed_suites"] == 0 and report["test_run_summary"]["error_suites"] == 0:
logger.info("All tests completed successfully!")
sys.exit(0)
else:
logger.error("Some tests failed!")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Test run interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Test run failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
# Handle both direct execution and pytest discovery
if len(sys.argv) > 1 and sys.argv[1] in ["--suite", "-h", "--help"]:
# Running as main script with arguments
asyncio.run(main())
else:
# Running as pytest discovery or direct execution without args
print("Training Service Test Runner")
print("=" * 50)
print("Usage:")
print(" python run_tests.py --suite all # Run all test suites")
print(" python run_tests.py --suite unit # Run unit tests only")
print(" python run_tests.py --suite integration # Run integration tests only")
print(" python run_tests.py --suite performance # Run performance tests only")
print(" python run_tests.py --quick # Run quick tests (skip performance)")
print(" python run_tests.py -v # Verbose output")
print()
print("Available test suites:")
runner = TrainingTestRunner()
for suite_name, config in runner.test_suites.items():
print(f" {suite_name.ljust(15)}: {config['description']}")
print()
# If no arguments provided, run all tests
if len(sys.argv) == 1:
print("No arguments provided. Running all tests...")
asyncio.run(TrainingTestRunner().run_all_tests())