From 351f67331839a123b089476ab1efa305a3f992b9 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Sun, 20 Jul 2025 13:48:26 +0200 Subject: [PATCH] Add pytest tests to auth --- services/auth/tests/conftest.py | 535 ++++++- services/auth/tests/pytest.ini | 19 + services/auth/tests/run_tests.py | 785 +++++++++ services/auth/tests/test_auth.py | 79 - .../auth/tests/test_auth_comprehensive.py | 1397 +++++++++++++++++ services/auth/tests/test_users.py | 74 - 6 files changed, 2698 insertions(+), 191 deletions(-) create mode 100644 services/auth/tests/pytest.ini create mode 100755 services/auth/tests/run_tests.py delete mode 100644 services/auth/tests/test_auth.py create mode 100644 services/auth/tests/test_auth_comprehensive.py delete mode 100644 services/auth/tests/test_users.py diff --git a/services/auth/tests/conftest.py b/services/auth/tests/conftest.py index 31c35b5a..40481593 100644 --- a/services/auth/tests/conftest.py +++ b/services/auth/tests/conftest.py @@ -1,29 +1,31 @@ # ================================================================ # services/auth/tests/conftest.py +# Pytest configuration and shared fixtures for auth service tests # ================================================================ -"""Test configuration for auth service""" +""" +Shared test configuration and fixtures for authentication service tests +""" import pytest import asyncio +import os +import sys from typing import AsyncGenerator +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker -from fastapi.testclient import TestClient +import redis.asyncio as redis -from app.main import app -from app.core.database import get_db -from shared.database.base import Base +# Add the app directory to the Python path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) -# Test database URL -TEST_DATABASE_URL = "postgresql+asyncpg://test_user:test_pass@localhost:5433/test_auth_db" +# ================================================================ +# TEST DATABASE CONFIGURATION +# ================================================================ -# Create test engine -test_engine = create_async_engine(TEST_DATABASE_URL, echo=False) - -# Create test session -TestSessionLocal = sessionmaker( - test_engine, class_=AsyncSession, expire_on_commit=False -) +# Use in-memory SQLite for fast testing +TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" @pytest.fixture(scope="session") def event_loop(): @@ -32,38 +34,495 @@ def event_loop(): yield loop loop.close() -@pytest.fixture -async def db() -> AsyncGenerator[AsyncSession, None]: - """Database fixture""" - async with test_engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) +@pytest.fixture(scope="function") +async def test_engine(): + """Create a test database engine for each test function""" + engine = create_async_engine( + TEST_DATABASE_URL, + echo=False, # Set to True for SQL debugging + future=True, + pool_pre_ping=True + ) - async with TestSessionLocal() as session: + try: + # Import models and base here to avoid import issues + from shared.database.base import Base + # Create all tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + yield engine + + # Cleanup + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + except ImportError: + # If shared.database.base is not available, create a mock + yield engine + finally: + await engine.dispose() + +@pytest.fixture(scope="function") +async def test_db(test_engine) -> AsyncGenerator[AsyncSession, None]: + """Create a test database session for each test function""" + async_session = sessionmaker( + test_engine, + class_=AsyncSession, + expire_on_commit=False + ) + + async with async_session() as session: yield session - - async with test_engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) + +@pytest.fixture(scope="function") +def client(test_db): + """Create a test client with database dependency override""" + try: + from app.main import app + from app.core.database import get_db + + def override_get_db(): + return test_db + + app.dependency_overrides[get_db] = override_get_db + + with TestClient(app) as test_client: + yield test_client + + # Clean up overrides + app.dependency_overrides.clear() + except ImportError as e: + pytest.skip(f"Cannot import app modules: {e}") + +# ================================================================ +# MOCK FIXTURES +# ================================================================ @pytest.fixture -def client(db: AsyncSession): - """Test client fixture""" - def override_get_db(): - yield db +def mock_redis(): + """Mock Redis client for testing rate limiting and session management""" + redis_mock = AsyncMock() - app.dependency_overrides[get_db] = override_get_db + # Default return values for common operations + redis_mock.get.return_value = None + redis_mock.incr.return_value = 1 + redis_mock.expire.return_value = True + redis_mock.delete.return_value = True + redis_mock.setex.return_value = True + redis_mock.exists.return_value = False - with TestClient(app) as test_client: - yield test_client - - app.dependency_overrides.clear() + return redis_mock @pytest.fixture -def test_user_data(): - """Test user data fixture""" +def mock_rabbitmq(): + """Mock RabbitMQ for testing event publishing""" + rabbitmq_mock = AsyncMock() + + # Mock publisher methods + rabbitmq_mock.publish.return_value = True + rabbitmq_mock.connect.return_value = True + rabbitmq_mock.disconnect.return_value = True + + return rabbitmq_mock + +@pytest.fixture +def mock_external_services(): + """Mock external service calls (tenant service, etc.)""" + with patch('httpx.AsyncClient') as mock_client: + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"tenants": []} + mock_client.return_value.__aenter__.return_value.get.return_value = mock_response + mock_client.return_value.__aenter__.return_value.post.return_value = mock_response + + yield mock_client + +# ================================================================ +# DATA FIXTURES +# ================================================================ + +@pytest.fixture +def valid_user_data(): + """Valid user registration data""" return { "email": "test@bakery.es", - "password": "TestPass123", - "full_name": "Test User", - "phone": "+34123456789", - "language": "es" + "password": "TestPassword123", + "full_name": "Test User" } + +@pytest.fixture +def valid_user_data_list(): + """List of valid user data for multiple users""" + return [ + { + "email": f"test{i}@bakery.es", + "password": "TestPassword123", + "full_name": f"Test User {i}" + } + for i in range(1, 6) + ] + +@pytest.fixture +def weak_password_data(): + """User data with various weak passwords""" + return [ + {"email": "weak1@bakery.es", "password": "123", "full_name": "Weak 1"}, + {"email": "weak2@bakery.es", "password": "password", "full_name": "Weak 2"}, + {"email": "weak3@bakery.es", "password": "PASSWORD123", "full_name": "Weak 3"}, + {"email": "weak4@bakery.es", "password": "testpassword", "full_name": "Weak 4"}, + ] + +@pytest.fixture +def invalid_email_data(): + """User data with invalid email formats""" + return [ + {"email": "invalid", "password": "TestPassword123", "full_name": "Invalid 1"}, + {"email": "@bakery.es", "password": "TestPassword123", "full_name": "Invalid 2"}, + {"email": "test@", "password": "TestPassword123", "full_name": "Invalid 3"}, + {"email": "test..test@bakery.es", "password": "TestPassword123", "full_name": "Invalid 4"}, + ] + +# ================================================================ +# USER FIXTURES +# ================================================================ + +@pytest.fixture +async def test_user(test_db, valid_user_data): + """Create a test user in the database""" + try: + from app.services.auth_service import AuthService + + user = await AuthService.create_user( + email=valid_user_data["email"], + password=valid_user_data["password"], + full_name=valid_user_data["full_name"], + db=test_db + ) + return user + except ImportError: + pytest.skip("AuthService not available") + +@pytest.fixture +async def test_users(test_db, valid_user_data_list): + """Create multiple test users in the database""" + try: + from app.services.auth_service import AuthService + + users = [] + for user_data in valid_user_data_list: + user = await AuthService.create_user( + email=user_data["email"], + password=user_data["password"], + full_name=user_data["full_name"], + db=test_db + ) + users.append(user) + return users + except ImportError: + pytest.skip("AuthService not available") + +@pytest.fixture +async def authenticated_user(client, valid_user_data): + """Create an authenticated user and return user info, tokens, and headers""" + # Register user + register_response = client.post("/auth/register", json=valid_user_data) + assert register_response.status_code == 200 + + # Login user + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + assert login_response.status_code == 200 + + token_data = login_response.json() + + return { + "user": register_response.json(), + "tokens": token_data, + "access_token": token_data["access_token"], + "refresh_token": token_data["refresh_token"], + "headers": {"Authorization": f"Bearer {token_data['access_token']}"} + } + +# ================================================================ +# CONFIGURATION FIXTURES +# ================================================================ + +@pytest.fixture +def test_settings(): + """Test-specific settings override""" + try: + from app.core.config import settings + + original_settings = {} + + # Store original values + test_overrides = { + 'JWT_ACCESS_TOKEN_EXPIRE_MINUTES': 30, + 'JWT_REFRESH_TOKEN_EXPIRE_DAYS': 7, + 'PASSWORD_MIN_LENGTH': 8, + 'PASSWORD_REQUIRE_UPPERCASE': True, + 'PASSWORD_REQUIRE_LOWERCASE': True, + 'PASSWORD_REQUIRE_NUMBERS': True, + 'PASSWORD_REQUIRE_SYMBOLS': False, + 'MAX_LOGIN_ATTEMPTS': 5, + 'LOCKOUT_DURATION_MINUTES': 30, + 'BCRYPT_ROUNDS': 4, # Lower for faster tests + } + + for key, value in test_overrides.items(): + if hasattr(settings, key): + original_settings[key] = getattr(settings, key) + setattr(settings, key, value) + + yield settings + + # Restore original values + for key, value in original_settings.items(): + setattr(settings, key, value) + except ImportError: + pytest.skip("Settings not available") + +# ================================================================ +# PATCHING FIXTURES +# ================================================================ + +@pytest.fixture +def patch_redis(mock_redis): + """Patch Redis client for all tests""" + with patch('app.core.security.redis_client', mock_redis): + yield mock_redis + +@pytest.fixture +def patch_messaging(mock_rabbitmq): + """Patch messaging system for all tests""" + with patch('app.services.messaging.publisher', mock_rabbitmq): + yield mock_rabbitmq + +@pytest.fixture +def patch_external_apis(mock_external_services): + """Patch external API calls""" + yield mock_external_services + +# ================================================================ +# UTILITY FIXTURES +# ================================================================ + +@pytest.fixture +def auth_headers(): + """Factory for creating authorization headers""" + def _create_headers(token): + return {"Authorization": f"Bearer {token}"} + return _create_headers + +@pytest.fixture +def password_generator(): + """Generate passwords with different characteristics""" + def _generate( + length=12, + include_upper=True, + include_lower=True, + include_numbers=True, + include_symbols=False + ): + import random + import string + + chars = "" + password = "" + + if include_lower: + chars += string.ascii_lowercase + password += random.choice(string.ascii_lowercase) + + if include_upper: + chars += string.ascii_uppercase + password += random.choice(string.ascii_uppercase) + + if include_numbers: + chars += string.digits + password += random.choice(string.digits) + + if include_symbols: + chars += "!@#$%^&*" + password += random.choice("!@#$%^&*") + + # Fill remaining length + remaining = length - len(password) + if remaining > 0: + password += ''.join(random.choice(chars) for _ in range(remaining)) + + # Shuffle the password + password_list = list(password) + random.shuffle(password_list) + return ''.join(password_list) + + return _generate + +# ================================================================ +# PERFORMANCE TESTING FIXTURES +# ================================================================ + +@pytest.fixture +def performance_timer(): + """Timer utility for performance testing""" + import time + + class Timer: + def __init__(self): + self.start_time = None + self.end_time = None + + def start(self): + self.start_time = time.time() + + def stop(self): + self.end_time = time.time() + + @property + def elapsed(self): + if self.start_time and self.end_time: + return self.end_time - self.start_time + return None + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + return Timer + +# ================================================================ +# DATABASE UTILITY FIXTURES +# ================================================================ + +@pytest.fixture +async def db_utils(test_db): + """Database utility functions for testing""" + class DBUtils: + def __init__(self, db): + self.db = db + + async def count_users(self): + try: + from sqlalchemy import select, func + from app.models.users import User + result = await self.db.execute(select(func.count(User.id))) + return result.scalar() + except ImportError: + return 0 + + async def get_user_by_email(self, email): + try: + from sqlalchemy import select + from app.models.users import User + result = await self.db.execute(select(User).where(User.email == email)) + return result.scalar_one_or_none() + except ImportError: + return None + + async def count_refresh_tokens(self): + try: + from sqlalchemy import select, func + from app.models.users import RefreshToken + result = await self.db.execute(select(func.count(RefreshToken.id))) + return result.scalar() + except ImportError: + return 0 + + async def clear_all_data(self): + try: + from app.models.users import User, RefreshToken + await self.db.execute(RefreshToken.__table__.delete()) + await self.db.execute(User.__table__.delete()) + await self.db.commit() + except ImportError: + pass + + return DBUtils(test_db) + +# ================================================================ +# LOGGING FIXTURES +# ================================================================ + +@pytest.fixture +def capture_logs(): + """Capture logs for testing""" + import logging + from io import StringIO + + log_capture = StringIO() + handler = logging.StreamHandler(log_capture) + handler.setLevel(logging.DEBUG) + + # Add handler to auth service loggers + loggers = [ + logging.getLogger('app.services.auth_service'), + logging.getLogger('app.core.security'), + logging.getLogger('app.api.auth'), + ] + + for logger in loggers: + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + yield log_capture + + # Clean up + for logger in loggers: + logger.removeHandler(handler) + +# ================================================================ +# TEST MARKERS AND CONFIGURATION +# ================================================================ + +def pytest_configure(config): + """Configure pytest with custom markers""" + config.addinivalue_line( + "markers", "unit: marks tests as unit tests" + ) + config.addinivalue_line( + "markers", "integration: marks tests as integration tests" + ) + config.addinivalue_line( + "markers", "api: marks tests as API tests" + ) + config.addinivalue_line( + "markers", "security: marks tests as security tests" + ) + config.addinivalue_line( + "markers", "performance: marks tests as performance tests" + ) + config.addinivalue_line( + "markers", "slow: marks tests as slow running" + ) + config.addinivalue_line( + "markers", "auth: marks tests as authentication tests" + ) + +def pytest_collection_modifyitems(config, items): + """Modify test collection to add markers automatically""" + for item in items: + # Add markers based on test class or function names + if "test_api" in item.name.lower() or "API" in str(item.cls): + item.add_marker(pytest.mark.api) + + if "test_security" in item.name.lower() or "Security" in str(item.cls): + item.add_marker(pytest.mark.security) + + if "test_performance" in item.name.lower() or "Performance" in str(item.cls): + item.add_marker(pytest.mark.performance) + item.add_marker(pytest.mark.slow) + + if "integration" in item.name.lower() or "Integration" in str(item.cls): + item.add_marker(pytest.mark.integration) + + if "Flow" in str(item.cls) or "flow" in item.name.lower(): + item.add_marker(pytest.mark.integration) + + if "auth" in item.name.lower() or "Auth" in str(item.cls): + item.add_marker(pytest.mark.auth) \ No newline at end of file diff --git a/services/auth/tests/pytest.ini b/services/auth/tests/pytest.ini new file mode 100644 index 00000000..2120feb2 --- /dev/null +++ b/services/auth/tests/pytest.ini @@ -0,0 +1,19 @@ +[pytest] +minversion = 6.0 +addopts = -ra -q --disable-warnings +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +markers = + unit: Unit tests + integration: Integration tests + api: API endpoint tests + security: Security tests + performance: Performance tests + slow: Slow running tests + auth: Authentication tests +asyncio_mode = auto +filterwarnings = + ignore::DeprecationWarning + ignore::PendingDeprecationWarning \ No newline at end of file diff --git a/services/auth/tests/run_tests.py b/services/auth/tests/run_tests.py new file mode 100755 index 00000000..be08795f --- /dev/null +++ b/services/auth/tests/run_tests.py @@ -0,0 +1,785 @@ +#!/usr/bin/env python3 +# ================================================================ +# services/auth/tests/run_tests.py +# Complete test runner script for auth service with comprehensive reporting +# ================================================================ +""" +Comprehensive test runner for authentication service +Provides various test execution modes and detailed reporting +""" + +import os +import sys +import subprocess +import argparse +import time +import json +from pathlib import Path +from typing import List, Dict, Optional, Tuple +from datetime import datetime + +# Add the project root to Python path +project_root = Path(__file__).parent.parent.parent.parent +sys.path.insert(0, str(project_root)) + +class Colors: + """ANSI color codes for terminal output""" + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + + @classmethod + def colorize(cls, text: str, color: str) -> str: + """Colorize text for terminal output""" + return f"{color}{text}{cls.END}" + +class TestMetrics: + """Track test execution metrics""" + + def __init__(self): + self.start_time = None + self.end_time = None + self.tests_run = 0 + self.tests_passed = 0 + self.tests_failed = 0 + self.tests_skipped = 0 + self.coverage_percentage = 0.0 + self.warnings_count = 0 + self.errors = [] + + def start(self): + """Start timing""" + self.start_time = time.time() + + def stop(self): + """Stop timing""" + self.end_time = time.time() + + @property + def duration(self) -> float: + """Get duration in seconds""" + if self.start_time and self.end_time: + return self.end_time - self.start_time + return 0.0 + + @property + def success_rate(self) -> float: + """Get success rate percentage""" + if self.tests_run > 0: + return (self.tests_passed / self.tests_run) * 100 + return 0.0 + +class AuthTestRunner: + """Test runner for authentication service with enhanced features""" + + def __init__(self, test_dir: str = "tests"): + self.test_dir = Path(test_dir) + self.project_root = Path(__file__).parent.parent + self.results: Dict[str, TestMetrics] = {} + self.overall_metrics = TestMetrics() + + def _print_header(self, title: str, char: str = "=", width: int = 80): + """Print a formatted header""" + print(Colors.colorize(char * width, Colors.CYAN)) + centered_title = title.center(width) + print(Colors.colorize(centered_title, Colors.BOLD + Colors.WHITE)) + print(Colors.colorize(char * width, Colors.CYAN)) + + def _print_step(self, message: str, emoji: str = "πŸ“‹"): + """Print a step message""" + print(f"\n{emoji} {Colors.colorize(message, Colors.BLUE)}") + + def _print_success(self, message: str): + """Print success message""" + print(f"βœ… {Colors.colorize(message, Colors.GREEN)}") + + def _print_error(self, message: str): + """Print error message""" + print(f"❌ {Colors.colorize(message, Colors.RED)}") + + def _print_warning(self, message: str): + """Print warning message""" + print(f"⚠️ {Colors.colorize(message, Colors.YELLOW)}") + + def run_command(self, cmd: List[str], capture_output: bool = True, timeout: int = 300) -> subprocess.CompletedProcess: + """Run a command and return the result""" + cmd_str = ' '.join(cmd) + print(f"πŸš€ Running: {Colors.colorize(cmd_str, Colors.MAGENTA)}") + + try: + result = subprocess.run( + cmd, + capture_output=capture_output, + text=True, + cwd=self.project_root, + timeout=timeout + ) + return result + except subprocess.TimeoutExpired: + self._print_error(f"Test execution timed out ({timeout} seconds)") + return subprocess.CompletedProcess(cmd, 1, "", "Timeout") + except Exception as e: + self._print_error(f"Error running command: {e}") + return subprocess.CompletedProcess(cmd, 1, "", str(e)) + + def _parse_pytest_output(self, output: str) -> TestMetrics: + """Parse pytest output to extract metrics""" + metrics = TestMetrics() + + lines = output.split('\n') + for line in lines: + line = line.strip() + + # Parse test results line (e.g., "45 passed, 2 failed, 1 skipped in 12.34s") + if ' passed' in line or ' failed' in line: + parts = line.split() + for i, part in enumerate(parts): + if part.isdigit(): + count = int(part) + if i + 1 < len(parts): + result_type = parts[i + 1] + if 'passed' in result_type: + metrics.tests_passed = count + elif 'failed' in result_type: + metrics.tests_failed = count + elif 'skipped' in result_type: + metrics.tests_skipped = count + elif 'warning' in result_type: + metrics.warnings_count = count + + # Parse coverage percentage + if 'TOTAL' in line and '%' in line: + parts = line.split() + for part in parts: + if '%' in part: + try: + metrics.coverage_percentage = float(part.replace('%', '')) + except ValueError: + pass + + metrics.tests_run = metrics.tests_passed + metrics.tests_failed + metrics.tests_skipped + return metrics + + def run_all_tests(self, verbose: bool = True) -> bool: + """Run all authentication tests""" + self._print_step("Running all authentication tests", "πŸ§ͺ") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v" if verbose else "-q", + "--tb=short", + "--strict-markers", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + + result = self.run_command(cmd, capture_output=not verbose) + + metrics.stop() + + if not verbose and result.stdout: + parsed_metrics = self._parse_pytest_output(result.stdout) + metrics.tests_run = parsed_metrics.tests_run + metrics.tests_passed = parsed_metrics.tests_passed + metrics.tests_failed = parsed_metrics.tests_failed + metrics.tests_skipped = parsed_metrics.tests_skipped + + self.results['all_tests'] = metrics + + success = result.returncode == 0 + if success: + self._print_success(f"All tests completed successfully ({metrics.duration:.2f}s)") + else: + self._print_error(f"Some tests failed ({metrics.duration:.2f}s)") + + return success + + def run_unit_tests(self) -> bool: + """Run unit tests only""" + self._print_step("Running unit tests", "πŸ”¬") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "unit", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['unit_tests'] = metrics + return result.returncode == 0 + + def run_integration_tests(self) -> bool: + """Run integration tests only""" + self._print_step("Running integration tests", "πŸ”—") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "integration", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['integration_tests'] = metrics + return result.returncode == 0 + + def run_api_tests(self) -> bool: + """Run API endpoint tests only""" + self._print_step("Running API tests", "🌐") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "api", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['api_tests'] = metrics + return result.returncode == 0 + + def run_security_tests(self) -> bool: + """Run security tests only""" + self._print_step("Running security tests", "πŸ”’") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "security", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['security_tests'] = metrics + return result.returncode == 0 + + def run_performance_tests(self) -> bool: + """Run performance tests only""" + self._print_step("Running performance tests", "⚑") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "performance", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['performance_tests'] = metrics + return result.returncode == 0 + + def run_coverage_tests(self) -> bool: + """Run tests with coverage reporting""" + self._print_step("Running tests with coverage", "πŸ“Š") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "--cov=app", + "--cov-report=html:htmlcov", + "--cov-report=term-missing", + "--cov-report=xml", + "--cov-branch", + "-v", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=True) + metrics.stop() + + if result.stdout: + parsed_metrics = self._parse_pytest_output(result.stdout) + metrics.coverage_percentage = parsed_metrics.coverage_percentage + print(result.stdout) + + self.results['coverage_tests'] = metrics + + if result.returncode == 0: + self._print_success("Coverage report generated in htmlcov/index.html") + if metrics.coverage_percentage > 0: + self._print_success(f"Coverage: {metrics.coverage_percentage:.1f}%") + + return result.returncode == 0 + + def run_fast_tests(self) -> bool: + """Run fast tests (exclude slow/performance tests)""" + self._print_step("Running fast tests only", "⚑") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-m", "not slow", + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['fast_tests'] = metrics + return result.returncode == 0 + + def run_specific_test(self, test_pattern: str) -> bool: + """Run specific test by pattern""" + self._print_step(f"Running tests matching: {test_pattern}", "🎯") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-k", test_pattern, + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results[f'specific_test_{test_pattern}'] = metrics + return result.returncode == 0 + + def run_parallel_tests(self, num_workers: Optional[int] = None) -> bool: + """Run tests in parallel""" + if num_workers is None: + num_workers_str = "auto" + else: + num_workers_str = str(num_workers) + + self._print_step(f"Running tests in parallel with {num_workers_str} workers", "πŸš€") + + cmd = [ + sys.executable, "-m", "pytest", + str(self.test_dir), + "-v", "-n", num_workers_str, + "--tb=short", + "--color=yes" + ] + + metrics = TestMetrics() + metrics.start() + result = self.run_command(cmd, capture_output=False) + metrics.stop() + + self.results['parallel_tests'] = metrics + return result.returncode == 0 + + def validate_test_environment(self) -> bool: + """Validate that the test environment is set up correctly""" + self._print_step("Validating test environment", "πŸ”") + + validation_steps = [ + ("Checking pytest availability", self._check_pytest), + ("Checking test files", self._check_test_files), + ("Checking app module", self._check_app_module), + ("Checking database module", self._check_database_module), + ("Checking dependencies", self._check_dependencies), + ] + + all_valid = True + for step_name, step_func in validation_steps: + print(f" πŸ“‹ {step_name}...") + if step_func(): + self._print_success(f" {step_name}") + else: + self._print_error(f" {step_name}") + all_valid = False + + return all_valid + + def _check_pytest(self) -> bool: + """Check if pytest is available""" + try: + result = subprocess.run([sys.executable, "-m", "pytest", "--version"], + capture_output=True, text=True) + if result.returncode != 0: + return False + print(f" βœ… {result.stdout.strip()}") + return True + except Exception: + return False + + def _check_test_files(self) -> bool: + """Check if test files exist""" + test_files = list(self.test_dir.glob("test_*.py")) + if not test_files: + print(f" ❌ No test files found in {self.test_dir}") + return False + print(f" βœ… Found {len(test_files)} test files") + return True + + def _check_app_module(self) -> bool: + """Check if app module can be imported""" + try: + sys.path.insert(0, str(self.project_root)) + import app + print(" βœ… App module can be imported") + return True + except ImportError as e: + print(f" ❌ Cannot import app module: {e}") + return False + + def _check_database_module(self) -> bool: + """Check database connectivity""" + try: + from app.core.database import get_db + print(" βœ… Database module available") + return True + except ImportError as e: + print(f" ⚠️ Database module not available: {e}") + return True # Non-critical for some tests + + def _check_dependencies(self) -> bool: + """Check required dependencies""" + required_packages = [ + "pytest", + "pytest-asyncio", + "fastapi", + "sqlalchemy", + "pydantic" + ] + + missing_packages = [] + for package in required_packages: + try: + __import__(package.replace('-', '_')) + except ImportError: + missing_packages.append(package) + + if missing_packages: + print(f" ❌ Missing packages: {', '.join(missing_packages)}") + return False + + print(f" βœ… All required packages available") + return True + + def generate_test_report(self) -> None: + """Generate a comprehensive test report""" + self._print_header("AUTH SERVICE TEST REPORT") + + if not self.results: + print("No test results available") + return + + # Summary table + print(f"\n{Colors.colorize('Test Category', Colors.BOLD):<25} " + f"{Colors.colorize('Status', Colors.BOLD):<12} " + f"{Colors.colorize('Duration', Colors.BOLD):<12} " + f"{Colors.colorize('Tests', Colors.BOLD):<15} " + f"{Colors.colorize('Success Rate', Colors.BOLD):<12}") + print("-" * 80) + + total_duration = 0 + total_tests = 0 + total_passed = 0 + + for test_type, metrics in self.results.items(): + if metrics.duration > 0: + total_duration += metrics.duration + total_tests += metrics.tests_run + total_passed += metrics.tests_passed + + # Status + if metrics.tests_failed == 0 and metrics.tests_run > 0: + status = Colors.colorize("βœ… PASSED", Colors.GREEN) + elif metrics.tests_run == 0: + status = Colors.colorize("βšͺ SKIPPED", Colors.YELLOW) + else: + status = Colors.colorize("❌ FAILED", Colors.RED) + + # Duration + duration_str = f"{metrics.duration:.2f}s" + + # Tests count + if metrics.tests_run > 0: + tests_str = f"{metrics.tests_passed}/{metrics.tests_run}" + else: + tests_str = "0" + + # Success rate + if metrics.tests_run > 0: + success_rate_str = f"{metrics.success_rate:.1f}%" + else: + success_rate_str = "N/A" + + print(f"{test_type.replace('_', ' ').title():<25} " + f"{status:<20} " + f"{duration_str:<12} " + f"{tests_str:<15} " + f"{success_rate_str:<12}") + + # Overall summary + print("-" * 80) + overall_success_rate = (total_passed / total_tests * 100) if total_tests > 0 else 0 + overall_status = "βœ… PASSED" if total_passed == total_tests and total_tests > 0 else "❌ FAILED" + + print(f"{'OVERALL':<25} " + f"{Colors.colorize(overall_status, Colors.BOLD):<20} " + f"{total_duration:.2f}s{'':<6} " + f"{total_passed}/{total_tests}{'':11} " + f"{overall_success_rate:.1f}%") + + print("\n" + "=" * 80) + + # Recommendations + self._print_recommendations(overall_success_rate, total_tests) + + def _print_recommendations(self, success_rate: float, total_tests: int): + """Print recommendations based on test results""" + print(f"\n{Colors.colorize('πŸ“‹ RECOMMENDATIONS', Colors.BOLD + Colors.CYAN)}") + + if success_rate == 100 and total_tests > 0: + self._print_success("Excellent! All tests passed. Your auth service is ready for deployment.") + elif success_rate >= 90: + self._print_warning("Good test coverage. Review failed tests before deployment.") + elif success_rate >= 70: + self._print_warning("Moderate test coverage. Significant issues need fixing.") + else: + self._print_error("Poor test results. Major issues need addressing before deployment.") + + # Specific recommendations + recommendations = [] + + if 'security_tests' in self.results: + security_metrics = self.results['security_tests'] + if security_metrics.tests_failed > 0: + recommendations.append("πŸ”’ Fix security test failures - critical for production") + + if 'coverage_tests' in self.results: + coverage_metrics = self.results['coverage_tests'] + if coverage_metrics.coverage_percentage < 80: + recommendations.append(f"πŸ“Š Increase test coverage (current: {coverage_metrics.coverage_percentage:.1f}%)") + + if 'performance_tests' in self.results: + perf_metrics = self.results['performance_tests'] + if perf_metrics.tests_failed > 0: + recommendations.append("⚑ Address performance issues") + + if recommendations: + print("\n" + Colors.colorize("Next Steps:", Colors.BOLD)) + for i, rec in enumerate(recommendations, 1): + print(f" {i}. {rec}") + + def clean_test_artifacts(self) -> None: + """Clean up test artifacts""" + self._print_step("Cleaning test artifacts", "🧹") + + artifacts = [ + ".pytest_cache", + "htmlcov", + ".coverage", + "coverage.xml", + "report.html", + "test-results.xml" + ] + + cleaned_count = 0 + for artifact in artifacts: + artifact_path = self.project_root / artifact + if artifact_path.exists(): + if artifact_path.is_dir(): + import shutil + shutil.rmtree(artifact_path) + else: + artifact_path.unlink() + self._print_success(f"Removed {artifact}") + cleaned_count += 1 + + # Clean __pycache__ directories + pycache_count = 0 + for pycache in self.project_root.rglob("__pycache__"): + import shutil + shutil.rmtree(pycache) + pycache_count += 1 + + # Clean .pyc files + pyc_count = 0 + for pyc in self.project_root.rglob("*.pyc"): + pyc.unlink() + pyc_count += 1 + + if pycache_count > 0: + self._print_success(f"Removed {pycache_count} __pycache__ directories") + if pyc_count > 0: + self._print_success(f"Removed {pyc_count} .pyc files") + + if cleaned_count == 0 and pycache_count == 0 and pyc_count == 0: + print(" πŸ“ No artifacts to clean") + else: + self._print_success("Test artifacts cleaned successfully") + + def save_results_json(self, filename: str = "test_results.json") -> None: + """Save test results to JSON file""" + results_data = { + "timestamp": datetime.now().isoformat(), + "test_categories": {} + } + + for test_type, metrics in self.results.items(): + results_data["test_categories"][test_type] = { + "duration": metrics.duration, + "tests_run": metrics.tests_run, + "tests_passed": metrics.tests_passed, + "tests_failed": metrics.tests_failed, + "tests_skipped": metrics.tests_skipped, + "success_rate": metrics.success_rate, + "coverage_percentage": metrics.coverage_percentage, + "warnings_count": metrics.warnings_count + } + + with open(filename, 'w') as f: + json.dump(results_data, f, indent=2) + + self._print_success(f"Test results saved to {filename}") + +def main(): + """Main entry point for test runner""" + parser = argparse.ArgumentParser( + description="Auth Service Test Runner", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python run_tests.py # Run all tests + python run_tests.py --test-type security # Run security tests only + python run_tests.py --coverage # Run with coverage + python run_tests.py --parallel --workers 4 # Run in parallel + python run_tests.py --pattern "test_login" # Run specific test pattern + python run_tests.py --validate # Validate environment + python run_tests.py --clean # Clean test artifacts + """ + ) + + parser.add_argument("--test-type", + choices=["all", "unit", "integration", "api", "security", "performance", "fast"], + default="all", + help="Type of tests to run") + parser.add_argument("--coverage", action="store_true", help="Run with coverage") + parser.add_argument("--parallel", action="store_true", help="Run tests in parallel") + parser.add_argument("--workers", type=int, help="Number of parallel workers") + parser.add_argument("--pattern", type=str, help="Run specific test pattern") + parser.add_argument("--validate", action="store_true", help="Validate test environment") + parser.add_argument("--clean", action="store_true", help="Clean test artifacts") + parser.add_argument("--verbose", action="store_true", default=True, help="Verbose output") + parser.add_argument("--save-results", action="store_true", help="Save results to JSON file") + parser.add_argument("--quiet", action="store_true", help="Quiet mode (less output)") + + args = parser.parse_args() + + runner = AuthTestRunner() + + # Print header + if not args.quiet: + runner._print_header("πŸ§ͺ AUTH SERVICE TEST RUNNER πŸ§ͺ") + + # Clean artifacts if requested + if args.clean: + runner.clean_test_artifacts() + return + + # Validate environment if requested + if args.validate: + success = runner.validate_test_environment() + if success: + runner._print_success("Test environment validation passed") + else: + runner._print_error("Test environment validation failed") + sys.exit(0 if success else 1) + + # Validate environment before running tests + if not args.quiet: + if not runner.validate_test_environment(): + runner._print_error("Test environment validation failed") + sys.exit(1) + + success = True + + try: + runner.overall_metrics.start() + + if args.pattern: + success = runner.run_specific_test(args.pattern) + elif args.coverage: + success = runner.run_coverage_tests() + elif args.parallel: + success = runner.run_parallel_tests(args.workers) + elif args.test_type == "unit": + success = runner.run_unit_tests() + elif args.test_type == "integration": + success = runner.run_integration_tests() + elif args.test_type == "api": + success = runner.run_api_tests() + elif args.test_type == "security": + success = runner.run_security_tests() + elif args.test_type == "performance": + success = runner.run_performance_tests() + elif args.test_type == "fast": + success = runner.run_fast_tests() + else: # all + success = runner.run_all_tests(args.verbose) + + runner.overall_metrics.stop() + + if not args.quiet: + runner.generate_test_report() + + if args.save_results: + runner.save_results_json() + + except KeyboardInterrupt: + runner._print_error("Tests interrupted by user") + success = False + except Exception as e: + runner._print_error(f"Error running tests: {e}") + success = False + + if success: + if not args.quiet: + runner._print_success("All tests completed successfully!") + sys.exit(0) + else: + if not args.quiet: + runner._print_error("Some tests failed!") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/services/auth/tests/test_auth.py b/services/auth/tests/test_auth.py deleted file mode 100644 index 11e090a7..00000000 --- a/services/auth/tests/test_auth.py +++ /dev/null @@ -1,79 +0,0 @@ -# ================================================================ -# services/auth/tests/test_auth.py -# ================================================================ -"""Authentication tests""" - -import pytest -from fastapi.testclient import TestClient -from sqlalchemy.ext.asyncio import AsyncSession - -from app.services.auth_service import AuthService -from app.schemas.auth import UserRegistration, UserLogin - -@pytest.mark.asyncio -async def test_register_user(db: AsyncSession): - """Test user registration""" - user_data = UserRegistration( - email="test@bakery.es", - password="TestPass123", - full_name="Test User", - language="es" - ) - - result = await AuthService.register_user(user_data, db) - - assert result.email == "test@bakery.es" - assert result.full_name == "Test User" - assert result.is_active is True - assert result.is_verified is False - -@pytest.mark.asyncio -async def test_login_user(db: AsyncSession): - """Test user login""" - # First register a user - user_data = UserRegistration( - email="test@bakery.es", - password="TestPass123", - full_name="Test User", - language="es" - ) - await AuthService.register_user(user_data, db) - - # Then login - login_data = UserLogin( - email="test@bakery.es", - password="TestPass123" - ) - - result = await AuthService.login_user(login_data, db, "127.0.0.1", "test-agent") - - assert result.access_token is not None - assert result.refresh_token is not None - assert result.token_type == "bearer" - -def test_register_endpoint(client: TestClient, test_user_data): - """Test registration endpoint""" - response = client.post("/auth/register", json=test_user_data) - - assert response.status_code == 200 - data = response.json() - assert data["email"] == test_user_data["email"] - assert "id" in data - -def test_login_endpoint(client: TestClient, test_user_data): - """Test login endpoint""" - # First register - client.post("/auth/register", json=test_user_data) - - # Then login - login_data = { - "email": test_user_data["email"], - "password": test_user_data["password"] - } - response = client.post("/auth/login", json=login_data) - - assert response.status_code == 200 - data = response.json() - assert "access_token" in data - assert "refresh_token" in data - assert data["token_type"] == "bearer" diff --git a/services/auth/tests/test_auth_comprehensive.py b/services/auth/tests/test_auth_comprehensive.py new file mode 100644 index 00000000..5f30023e --- /dev/null +++ b/services/auth/tests/test_auth_comprehensive.py @@ -0,0 +1,1397 @@ +# ================================================================ +# services/auth/tests/test_auth_comprehensive.py +# Complete pytest test suite for authentication service +# ================================================================ +""" +Comprehensive test suite for the authentication service +Tests registration, login, and all authentication-related functionality +""" + +import pytest +import asyncio +import uuid +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient +from fastapi import status +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy import text +import redis.asyncio as redis + +# Import the authentication service modules +from app.main import app +from app.models.users import User, RefreshToken +from app.schemas.auth import UserRegistration, UserLogin, TokenResponse +from app.services.auth_service import AuthService +from app.core.security import SecurityManager +from app.core.config import settings +from app.core.database import get_db +from shared.database.base import Base + +# ================================================================ +# TEST CONFIGURATION AND FIXTURES +# ================================================================ + +# Test database configuration - Use in-memory SQLite for speed +TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" + +@pytest.fixture(scope="function") +async def test_engine(): + """Create test database engine""" + engine = create_async_engine( + TEST_DATABASE_URL, + echo=False, + future=True + ) + + # Create all tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + yield engine + + # Clean up + await engine.dispose() + +@pytest.fixture(scope="function") +async def test_db(test_engine): + """Create test database session""" + async_session = sessionmaker( + test_engine, class_=AsyncSession, expire_on_commit=False + ) + + async with async_session() as session: + yield session + +@pytest.fixture(scope="function") +def client(test_db): + """Create test client with database dependency override""" + + def override_get_db(): + return test_db + + app.dependency_overrides[get_db] = override_get_db + + with TestClient(app) as test_client: + yield test_client + + # Clean up + app.dependency_overrides.clear() + +@pytest.fixture +def mock_redis(): + """Mock Redis client for testing""" + redis_mock = AsyncMock() + redis_mock.get.return_value = None + redis_mock.incr.return_value = 1 + redis_mock.expire.return_value = True + redis_mock.delete.return_value = True + redis_mock.setex.return_value = True + return redis_mock + +@pytest.fixture +def valid_user_data(): + """Valid user registration data""" + return { + "email": "test@bakery.es", + "password": "TestPassword123", + "full_name": "Test User" + } + +@pytest.fixture +def weak_password_user_data(): + """User data with weak password""" + return { + "email": "test@bakery.es", + "password": "weak", + "full_name": "Test User" + } + +@pytest.fixture +def invalid_email_user_data(): + """User data with invalid email""" + return { + "email": "invalid-email", + "password": "TestPassword123", + "full_name": "Test User" + } + +@pytest.fixture +async def test_user(test_db): + """Create a test user in the database""" + user_data = UserRegistration( + email="existing@bakery.es", + password="TestPassword123", + full_name="Existing User" + ) + + user = await AuthService.create_user( + email=user_data.email, + password=user_data.password, + full_name=user_data.full_name, + db=test_db + ) + return user + +# ================================================================ +# SECURITY MANAGER TESTS +# ================================================================ + +class TestSecurityManager: + """Test the SecurityManager utility class""" + + def test_hash_password(self): + """Test password hashing""" + password = "TestPassword123" + hashed = SecurityManager.hash_password(password) + + assert hashed != password + assert len(hashed) > 50 # bcrypt hashes are long + assert hashed.startswith('$2b$') # bcrypt format + + def test_verify_password_correct(self): + """Test password verification with correct password""" + password = "TestPassword123" + hashed = SecurityManager.hash_password(password) + + assert SecurityManager.verify_password(password, hashed) is True + + def test_verify_password_incorrect(self): + """Test password verification with incorrect password""" + password = "TestPassword123" + wrong_password = "WrongPassword123" + hashed = SecurityManager.hash_password(password) + + assert SecurityManager.verify_password(wrong_password, hashed) is False + + @pytest.mark.parametrize("password,expected", [ + ("TestPassword123", True), # Valid password + ("TestPwd123", True), # Valid but shorter + ("testpassword123", False), # No uppercase + ("TESTPASSWORD123", False), # No lowercase + ("TestPassword", False), # No numbers + ("Test123", False), # Too short + ("", False), # Empty + ]) + def test_validate_password(self, password, expected): + """Test password validation with various inputs""" + assert SecurityManager.validate_password(password) == expected + + def test_create_access_token(self): + """Test JWT access token creation""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es", + "full_name": "Test User" + } + + token = SecurityManager.create_access_token(user_data) + + assert isinstance(token, str) + assert len(token) > 100 # JWT tokens are long + + # Verify token can be decoded + decoded = SecurityManager.verify_token(token) + assert decoded is not None + assert decoded["email"] == user_data["email"] + + def test_create_refresh_token(self): + """Test JWT refresh token creation""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es" + } + + token = SecurityManager.create_refresh_token(user_data) + + assert isinstance(token, str) + assert len(token) > 100 + + def test_verify_token_valid(self): + """Test JWT token verification with valid token""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es" + } + + token = SecurityManager.create_access_token(user_data) + decoded = SecurityManager.verify_token(token) + + assert decoded is not None + assert decoded["user_id"] == user_data["user_id"] + assert decoded["email"] == user_data["email"] + + def test_verify_token_invalid(self): + """Test JWT token verification with invalid token""" + invalid_token = "invalid.jwt.token" + decoded = SecurityManager.verify_token(invalid_token) + + assert decoded is None + + def test_hash_token(self): + """Test token hashing for storage""" + token = "test_token_string" + hashed = SecurityManager.hash_token(token) + + assert hashed != token + assert len(hashed) == 64 # SHA256 hex digest length + + @pytest.mark.asyncio + async def test_check_login_attempts_no_attempts(self, mock_redis): + """Test checking login attempts when none exist""" + with patch('app.core.security.redis_client', mock_redis): + mock_redis.get.return_value = None + + result = await SecurityManager.check_login_attempts("test@bakery.es") + assert result is True + + @pytest.mark.asyncio + async def test_check_login_attempts_under_limit(self, mock_redis): + """Test checking login attempts under the limit""" + with patch('app.core.security.redis_client', mock_redis): + mock_redis.get.return_value = "3" # Under limit of 5 + + result = await SecurityManager.check_login_attempts("test@bakery.es") + assert result is True + + @pytest.mark.asyncio + async def test_check_login_attempts_over_limit(self, mock_redis): + """Test checking login attempts over the limit""" + with patch('app.core.security.redis_client', mock_redis): + mock_redis.get.return_value = "5" # At limit + + result = await SecurityManager.check_login_attempts("test@bakery.es") + assert result is False + + @pytest.mark.asyncio + async def test_increment_login_attempts(self, mock_redis): + """Test incrementing login attempts""" + with patch('app.core.security.redis_client', mock_redis): + await SecurityManager.increment_login_attempts("test@bakery.es") + + mock_redis.incr.assert_called_once() + mock_redis.expire.assert_called_once() + + @pytest.mark.asyncio + async def test_clear_login_attempts(self, mock_redis): + """Test clearing login attempts""" + with patch('app.core.security.redis_client', mock_redis): + await SecurityManager.clear_login_attempts("test@bakery.es") + + mock_redis.delete.assert_called_once() + +# ================================================================ +# AUTH SERVICE TESTS +# ================================================================ + +class TestAuthService: + """Test the AuthService business logic""" + + @pytest.mark.asyncio + async def test_create_user_success(self, test_db): + """Test successful user creation""" + email = "newuser@bakery.es" + password = "TestPassword123" + full_name = "New User" + + user = await AuthService.create_user(email, password, full_name, test_db) + + assert user.email == email + assert user.full_name == full_name + assert user.is_active is True + assert user.is_verified is False + assert user.id is not None + assert user.created_at is not None + + # Verify password was hashed + assert user.hashed_password != password + assert SecurityManager.verify_password(password, user.hashed_password) + + @pytest.mark.asyncio + async def test_create_user_duplicate_email(self, test_db, test_user): + """Test user creation with duplicate email""" + email = test_user.email # Use existing user's email + password = "TestPassword123" + full_name = "Duplicate User" + + with pytest.raises(Exception) as exc_info: + await AuthService.create_user(email, password, full_name, test_db) + + # Should raise HTTPException for duplicate email + assert "already registered" in str(exc_info.value).lower() + + @pytest.mark.asyncio + async def test_authenticate_user_success(self, test_db, test_user): + """Test successful user authentication""" + email = test_user.email + password = "TestPassword123" # Original password + + authenticated_user = await AuthService.authenticate_user(email, password, test_db) + + assert authenticated_user is not None + assert authenticated_user.id == test_user.id + assert authenticated_user.email == email + + @pytest.mark.asyncio + async def test_authenticate_user_wrong_password(self, test_db, test_user): + """Test authentication with wrong password""" + email = test_user.email + wrong_password = "WrongPassword123" + + authenticated_user = await AuthService.authenticate_user(email, wrong_password, test_db) + + assert authenticated_user is None + + @pytest.mark.asyncio + async def test_authenticate_user_nonexistent(self, test_db): + """Test authentication with non-existent email""" + email = "nonexistent@bakery.es" + password = "TestPassword123" + + authenticated_user = await AuthService.authenticate_user(email, password, test_db) + + assert authenticated_user is None + + @pytest.mark.asyncio + async def test_authenticate_user_inactive(self, test_db): + """Test authentication with inactive user""" + # Create inactive user + user = await AuthService.create_user( + "inactive@bakery.es", + "TestPassword123", + "Inactive User", + test_db + ) + + # Make user inactive + user.is_active = False + await test_db.commit() + + authenticated_user = await AuthService.authenticate_user( + "inactive@bakery.es", + "TestPassword123", + test_db + ) + + assert authenticated_user is None + + @pytest.mark.asyncio + async def test_login_user_success(self, test_db, test_user, mock_redis): + """Test successful login""" + with patch('app.core.security.redis_client', mock_redis): + with patch('app.services.auth_service.AuthService._get_user_tenants') as mock_tenants: + mock_tenants.return_value = [] + + result = await AuthService.login( + test_user.email, + "TestPassword123", + test_db + ) + + assert isinstance(result, dict) + assert "access_token" in result + assert "refresh_token" in result + assert "user" in result + assert result["user"]["email"] == test_user.email + + @pytest.mark.asyncio + async def test_login_user_invalid_credentials(self, test_db, test_user): + """Test login with invalid credentials""" + with pytest.raises(Exception) as exc_info: + await AuthService.login( + test_user.email, + "WrongPassword123", + test_db + ) + + assert "invalid credentials" in str(exc_info.value).lower() + +# ================================================================ +# API ENDPOINT TESTS +# ================================================================ + +class TestAuthenticationAPI: + """Test the authentication API endpoints""" + + def test_register_success(self, client, valid_user_data): + """Test successful user registration via API""" + response = client.post("/auth/register", json=valid_user_data) + + assert response.status_code == status.HTTP_200_OK + + data = response.json() + assert data["email"] == valid_user_data["email"] + assert data["full_name"] == valid_user_data["full_name"] + assert data["is_active"] is True + assert data["is_verified"] is False + assert "id" in data + assert "created_at" in data + + def test_register_weak_password(self, client, weak_password_user_data): + """Test registration with weak password""" + response = client.post("/auth/register", json=weak_password_user_data) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + data = response.json() + assert "password" in data["detail"].lower() + + def test_register_invalid_email(self, client, invalid_email_user_data): + """Test registration with invalid email""" + response = client.post("/auth/register", json=invalid_email_user_data) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_register_missing_fields(self, client): + """Test registration with missing required fields""" + incomplete_data = { + "email": "test@bakery.es" + # Missing password and full_name + } + + response = client.post("/auth/register", json=incomplete_data) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_register_duplicate_email(self, client, valid_user_data): + """Test registration with duplicate email""" + # First registration + response1 = client.post("/auth/register", json=valid_user_data) + assert response1.status_code == status.HTTP_200_OK + + # Second registration with same email + response2 = client.post("/auth/register", json=valid_user_data) + assert response2.status_code == status.HTTP_400_BAD_REQUEST + + data = response2.json() + assert "already registered" in data["detail"].lower() + + def test_login_success(self, client, valid_user_data): + """Test successful login via API""" + # First register a user + client.post("/auth/register", json=valid_user_data) + + # Then login + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + + response = client.post("/auth/login", json=login_data) + + assert response.status_code == status.HTTP_200_OK + + data = response.json() + assert "access_token" in data + assert "refresh_token" in data + assert data["token_type"] == "bearer" + assert "expires_in" in data + assert "user" in data + assert data["user"]["email"] == valid_user_data["email"] + + def test_login_wrong_password(self, client, valid_user_data): + """Test login with wrong password""" + # First register a user + client.post("/auth/register", json=valid_user_data) + + # Then login with wrong password + login_data = { + "email": valid_user_data["email"], + "password": "WrongPassword123" + } + + response = client.post("/auth/login", json=login_data) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + data = response.json() + assert "invalid credentials" in data["detail"].lower() + + def test_login_nonexistent_user(self, client): + """Test login with non-existent user""" + login_data = { + "email": "nonexistent@bakery.es", + "password": "TestPassword123" + } + + response = client.post("/auth/login", json=login_data) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_login_missing_fields(self, client): + """Test login with missing fields""" + incomplete_data = { + "email": "test@bakery.es" + # Missing password + } + + response = client.post("/auth/login", json=incomplete_data) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_login_invalid_email_format(self, client): + """Test login with invalid email format""" + login_data = { + "email": "invalid-email", + "password": "TestPassword123" + } + + response = client.post("/auth/login", json=login_data) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + +# ================================================================ +# INTEGRATION TESTS +# ================================================================ + +class TestAuthenticationFlow: + """Test complete authentication flows""" + + def test_complete_registration_login_flow(self, client, valid_user_data): + """Test complete flow: register -> login -> access protected endpoint""" + # Step 1: Register + register_response = client.post("/auth/register", json=valid_user_data) + assert register_response.status_code == status.HTTP_200_OK + + user_data = register_response.json() + user_id = user_data["id"] + + # Step 2: Login + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + assert login_response.status_code == status.HTTP_200_OK + + token_data = login_response.json() + access_token = token_data["access_token"] + + # Step 3: Access protected endpoint + headers = {"Authorization": f"Bearer {access_token}"} + profile_response = client.get("/users/me", headers=headers) + assert profile_response.status_code == status.HTTP_200_OK + + profile_data = profile_response.json() + assert profile_data["id"] == user_id + assert profile_data["email"] == valid_user_data["email"] + + def test_token_refresh_flow(self, client, valid_user_data): + """Test token refresh flow""" + # Register and login + client.post("/auth/register", json=valid_user_data) + + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + token_data = login_response.json() + + refresh_token = token_data["refresh_token"] + + # Refresh token + refresh_data = {"refresh_token": refresh_token} + refresh_response = client.post("/auth/refresh", json=refresh_data) + + assert refresh_response.status_code == status.HTTP_200_OK + + new_token_data = refresh_response.json() + assert "access_token" in new_token_data + assert "refresh_token" in new_token_data + + # Verify new token works + headers = {"Authorization": f"Bearer {new_token_data['access_token']}"} + profile_response = client.get("/users/me", headers=headers) + assert profile_response.status_code == status.HTTP_200_OK + + def test_logout_flow(self, client, valid_user_data): + """Test logout flow""" + # Register and login + client.post("/auth/register", json=valid_user_data) + + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + token_data = login_response.json() + + access_token = token_data["access_token"] + + # Logout + headers = {"Authorization": f"Bearer {access_token}"} + logout_response = client.post("/auth/logout", headers=headers) + + assert logout_response.status_code == status.HTTP_200_OK + + # Verify token is invalidated (if logout invalidates tokens) + # This depends on implementation - some systems don't invalidate JWT tokens + # profile_response = client.get("/users/me", headers=headers) + # assert profile_response.status_code == status.HTTP_401_UNAUTHORIZED + +# ================================================================ +# ERROR HANDLING TESTS +# ================================================================ + +class TestErrorHandling: + """Test error handling scenarios""" + + @pytest.mark.asyncio + async def test_database_error_during_registration(self, test_db): + """Test handling of database errors during registration""" + with patch.object(test_db, 'commit', side_effect=Exception("Database error")): + with pytest.raises(Exception): + await AuthService.create_user( + "test@bakery.es", + "TestPassword123", + "Test User", + test_db + ) + + @pytest.mark.asyncio + async def test_database_error_during_authentication(self, test_db): + """Test handling of database errors during authentication""" + with patch.object(test_db, 'execute', side_effect=Exception("Database error")): + result = await AuthService.authenticate_user( + "test@bakery.es", + "TestPassword123", + test_db + ) + assert result is None + + def test_malformed_json_request(self, client): + """Test handling of malformed JSON in requests""" + response = client.post( + "/auth/register", + data="invalid json", + headers={"Content-Type": "application/json"} + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_empty_request_body(self, client): + """Test handling of empty request body""" + response = client.post("/auth/register", json={}) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + +# ================================================================ +# SECURITY TESTS +# ================================================================ + +class TestSecurity: + """Test security-related functionality""" + + def test_password_requirements_enforced(self, client): + """Test that password requirements are enforced""" + test_cases = [ + ("weak", "Too short"), + ("nouppercasehere123", "No uppercase"), + ("NOLOWERCASEHERE123", "No lowercase"), + ("NoNumbersHere", "No numbers"), + ] + + for password, description in test_cases: + user_data = { + "email": f"test_{password}@bakery.es", + "password": password, + "full_name": "Test User" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_400_BAD_REQUEST, f"Failed for {description}" + + def test_email_validation(self, client): + """Test email validation""" + invalid_emails = [ + "invalid", + "@bakery.es", + "test@", + "test..test@bakery.es", + "test@bakery", + ] + + for email in invalid_emails: + user_data = { + "email": email, + "password": "TestPassword123", + "full_name": "Test User" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_sql_injection_prevention(self, client): + """Test SQL injection prevention""" + malicious_email = "test@bakery.es'; DROP TABLE users; --" + + user_data = { + "email": malicious_email, + "password": "TestPassword123", + "full_name": "Test User" + } + + # Should handle gracefully without SQL injection + response = client.post("/auth/register", json=user_data) + # Depending on email validation, this might be 422 or 400 + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + + def test_password_not_logged(self, client, valid_user_data, caplog): + """Test that passwords are not logged""" + with caplog.at_level("DEBUG"): + client.post("/auth/register", json=valid_user_data) + + # Check that password is not in any log messages + for record in caplog.records: + assert valid_user_data["password"] not in record.getMessage() + +# ================================================================ +# RATE LIMITING TESTS (if implemented) +# ================================================================ + +class TestRateLimiting: + """Test rate limiting functionality""" + + @pytest.mark.asyncio + async def test_login_rate_limiting(self, mock_redis): + """Test login rate limiting""" + with patch('app.core.security.redis_client', mock_redis): + # Simulate multiple failed attempts + email = "test@bakery.es" + + # First few attempts should be allowed + mock_redis.get.return_value = "3" + result = await SecurityManager.check_login_attempts(email) + assert result is True + + # After max attempts, should be blocked + mock_redis.get.return_value = str(settings.MAX_LOGIN_ATTEMPTS) + result = await SecurityManager.check_login_attempts(email) + assert result is False + + def test_multiple_registration_attempts(self, client, valid_user_data): + """Test handling of multiple registration attempts""" + # This test depends on rate limiting implementation + # For now, just ensure system handles multiple requests gracefully + + for i in range(3): + user_data = valid_user_data.copy() + user_data["email"] = f"test{i}@bakery.es" + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_200_OK + +# ================================================================ +# PERFORMANCE TESTS +# ================================================================ + +class TestPerformance: + """Basic performance tests""" + + def test_registration_performance(self, client): + """Test registration performance with multiple users""" + import time + + start_time = time.time() + + for i in range(10): + user_data = { + "email": f"perftest{i}@bakery.es", + "password": "TestPassword123", + "full_name": f"Performance Test User {i}" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_200_OK + + end_time = time.time() + duration = end_time - start_time + + # Should complete 10 logins in reasonable time + assert duration < 5.0, f"Login took too long: {duration}s" + +# ================================================================ +# EDGE CASES AND BOUNDARY TESTS +# ================================================================ + +class TestEdgeCases: + """Test edge cases and boundary conditions""" + + def test_very_long_email(self, client): + """Test registration with very long email""" + long_email = "a" * 250 + "@bakery.es" + + user_data = { + "email": long_email, + "password": "TestPassword123", + "full_name": "Test User" + } + + response = client.post("/auth/register", json=user_data) + # Should handle gracefully (either accept if within limits or reject) + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + + def test_very_long_full_name(self, client): + """Test registration with very long full name""" + long_name = "A" * 300 + + user_data = { + "email": "test@bakery.es", + "password": "TestPassword123", + "full_name": long_name + } + + response = client.post("/auth/register", json=user_data) + # Should handle gracefully + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + + def test_very_long_password(self, client): + """Test registration with very long password""" + long_password = "A1" + "a" * 1000 # Starts with valid pattern + + user_data = { + "email": "test@bakery.es", + "password": long_password, + "full_name": "Test User" + } + + response = client.post("/auth/register", json=user_data) + # Should handle gracefully + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST] + + def test_unicode_characters_in_name(self, client): + """Test registration with unicode characters in name""" + user_data = { + "email": "test@bakery.es", + "password": "TestPassword123", + "full_name": "JosΓ© MarΓ­a GarcΓ­a-HernΓ‘ndez ζ΅‹θ―•η”¨ζˆ· πŸ₯–" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_200_OK + + data = response.json() + assert data["full_name"] == user_data["full_name"] + + def test_special_characters_in_email(self, client): + """Test registration with special characters in email""" + # These are valid email characters + special_emails = [ + "test+tag@bakery.es", + "test.dot@bakery.es", + "test_underscore@bakery.es", + "test-dash@bakery.es" + ] + + for email in special_emails: + user_data = { + "email": email, + "password": "TestPassword123", + "full_name": "Test User" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_200_OK, f"Failed for email: {email}" + + def test_empty_strings(self, client): + """Test registration with empty strings""" + user_data = { + "email": "", + "password": "", + "full_name": "" + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_null_values(self, client): + """Test registration with null values""" + user_data = { + "email": None, + "password": None, + "full_name": None + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_whitespace_only_fields(self, client): + """Test registration with whitespace-only fields""" + user_data = { + "email": " ", + "password": " ", + "full_name": " " + } + + response = client.post("/auth/register", json=user_data) + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + + def test_case_sensitivity_email(self, client): + """Test email case sensitivity""" + user_data1 = { + "email": "Test@Bakery.ES", + "password": "TestPassword123", + "full_name": "Test User 1" + } + + user_data2 = { + "email": "test@bakery.es", + "password": "TestPassword123", + "full_name": "Test User 2" + } + + # Register first user + response1 = client.post("/auth/register", json=user_data1) + assert response1.status_code == status.HTTP_200_OK + + # Try to register second user with different case + response2 = client.post("/auth/register", json=user_data2) + # Depending on implementation, might be treated as same email + # Most systems normalize email case + if response2.status_code == status.HTTP_400_BAD_REQUEST: + assert "already registered" in response2.json()["detail"].lower() + +# ================================================================ +# CONCURRENT ACCESS TESTS +# ================================================================ + +class TestConcurrency: + """Test concurrent access scenarios""" + + @pytest.mark.asyncio + async def test_concurrent_registration_same_email(self, test_db): + """Test concurrent registration with same email""" + import asyncio + + email = "concurrent@bakery.es" + password = "TestPassword123" + full_name = "Concurrent User" + + # Create multiple concurrent registration tasks + tasks = [ + AuthService.create_user(email, password, f"{full_name} {i}", test_db) + for i in range(3) + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Only one should succeed, others should fail + successes = [r for r in results if not isinstance(r, Exception)] + failures = [r for r in results if isinstance(r, Exception)] + + assert len(successes) == 1, "Only one registration should succeed" + assert len(failures) >= 2, "Other registrations should fail" + + @pytest.mark.asyncio + async def test_concurrent_login_attempts(self, test_db, test_user): + """Test concurrent login attempts for same user""" + import asyncio + + email = test_user.email + password = "TestPassword123" + + # Create multiple concurrent login tasks + tasks = [ + AuthService.authenticate_user(email, password, test_db) + for _ in range(5) + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # All should succeed (assuming no rate limiting in this test) + successes = [r for r in results if r is not None and not isinstance(r, Exception)] + assert len(successes) >= 3, "Most login attempts should succeed" + +# ================================================================ +# DATA INTEGRITY TESTS +# ================================================================ + +class TestDataIntegrity: + """Test data integrity and consistency""" + + @pytest.mark.asyncio + async def test_user_data_integrity_after_creation(self, test_db): + """Test that user data remains intact after creation""" + email = "integrity@bakery.es" + password = "TestPassword123" + full_name = "Integrity Test User" + + user = await AuthService.create_user(email, password, full_name, test_db) + + # Verify all fields are correctly set + assert user.email == email + assert user.full_name == full_name + assert user.is_active is True + assert user.is_verified is False + assert user.created_at is not None + assert isinstance(user.created_at, datetime) + assert user.created_at.tzinfo is not None # Should be timezone-aware + + # Verify password is hashed and verifiable + assert user.hashed_password != password + assert SecurityManager.verify_password(password, user.hashed_password) + + @pytest.mark.asyncio + async def test_last_login_update(self, test_db, test_user): + """Test that last_login is updated on authentication""" + original_last_login = test_user.last_login + + # Authenticate user + authenticated_user = await AuthService.authenticate_user( + test_user.email, + "TestPassword123", + test_db + ) + + assert authenticated_user is not None + assert authenticated_user.last_login is not None + + if original_last_login: + assert authenticated_user.last_login > original_last_login + else: + assert authenticated_user.last_login is not None + + @pytest.mark.asyncio + async def test_database_rollback_on_error(self, test_db): + """Test that database is rolled back on errors""" + # Get initial user count + from sqlalchemy import select, func + result = await test_db.execute(select(func.count(User.id))) + initial_count = result.scalar() + + # Try to create user with invalid data that should cause rollback + with patch.object(test_db, 'commit', side_effect=Exception("Simulated error")): + try: + await AuthService.create_user( + "rollback@bakery.es", + "TestPassword123", + "Rollback Test User", + test_db + ) + except Exception: + pass # Expected to fail + + # Verify user count hasn't changed + result = await test_db.execute(select(func.count(User.id))) + final_count = result.scalar() + + assert final_count == initial_count, "Database should be rolled back on error" + +# ================================================================ +# TOKEN MANAGEMENT TESTS +# ================================================================ + +class TestTokenManagement: + """Test JWT token management""" + + def test_token_expiration_validation(self): + """Test that tokens expire correctly""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es" + } + + # Create token with very short expiration + with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 0): + token = SecurityManager.create_access_token(user_data) + + # Wait a moment and verify token is expired + import time + time.sleep(1) + + decoded = SecurityManager.verify_token(token) + assert decoded is None, "Expired token should not be valid" + + def test_token_contains_correct_claims(self): + """Test that tokens contain correct claims""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es", + "full_name": "Test User" + } + + token = SecurityManager.create_access_token(user_data) + decoded = SecurityManager.verify_token(token) + + assert decoded is not None + assert decoded["user_id"] == user_data["user_id"] + assert decoded["email"] == user_data["email"] + assert decoded["full_name"] == user_data["full_name"] + assert "exp" in decoded # Expiration claim + assert "iat" in decoded # Issued at claim + + def test_refresh_token_different_from_access_token(self): + """Test that refresh tokens are different from access tokens""" + user_data = { + "user_id": str(uuid.uuid4()), + "email": "test@bakery.es" + } + + access_token = SecurityManager.create_access_token(user_data) + refresh_token = SecurityManager.create_refresh_token(user_data) + + assert access_token != refresh_token + + # Both should be valid but have different expiration times + access_decoded = SecurityManager.verify_token(access_token) + refresh_decoded = SecurityManager.verify_token(refresh_token) + + assert access_decoded is not None + assert refresh_decoded is not None + assert access_decoded["exp"] < refresh_decoded["exp"] # Refresh token expires later + +# ================================================================ +# COMPREHENSIVE INTEGRATION TESTS +# ================================================================ + +class TestCompleteAuthenticationFlows: + """Comprehensive end-to-end authentication flow tests""" + + def test_full_user_lifecycle(self, client): + """Test complete user lifecycle: register -> login -> use -> logout""" + user_data = { + "email": "lifecycle@bakery.es", + "password": "TestPassword123", + "full_name": "Lifecycle Test User" + } + + # 1. Registration + register_response = client.post("/auth/register", json=user_data) + assert register_response.status_code == status.HTTP_200_OK + + user_info = register_response.json() + assert user_info["email"] == user_data["email"] + assert user_info["is_active"] is True + + # 2. Login + login_data = { + "email": user_data["email"], + "password": user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + assert login_response.status_code == status.HTTP_200_OK + + token_info = login_response.json() + access_token = token_info["access_token"] + refresh_token = token_info["refresh_token"] + + # 3. Use access token to access protected endpoint + headers = {"Authorization": f"Bearer {access_token}"} + profile_response = client.get("/users/me", headers=headers) + assert profile_response.status_code == status.HTTP_200_OK + + profile_data = profile_response.json() + assert profile_data["email"] == user_data["email"] + + # 4. Refresh token + refresh_data = {"refresh_token": refresh_token} + refresh_response = client.post("/auth/refresh", json=refresh_data) + assert refresh_response.status_code == status.HTTP_200_OK + + new_token_info = refresh_response.json() + new_access_token = new_token_info["access_token"] + + # 5. Use new access token + new_headers = {"Authorization": f"Bearer {new_access_token}"} + new_profile_response = client.get("/users/me", headers=new_headers) + assert new_profile_response.status_code == status.HTTP_200_OK + + # 6. Logout + logout_response = client.post("/auth/logout", headers=new_headers) + assert logout_response.status_code == status.HTTP_200_OK + + def test_multiple_sessions_same_user(self, client, valid_user_data): + """Test multiple simultaneous sessions for same user""" + # Register user + client.post("/auth/register", json=valid_user_data) + + login_data = { + "email": valid_user_data["email"], + "password": valid_user_data["password"] + } + + # Create multiple sessions + session1_response = client.post("/auth/login", json=login_data) + session2_response = client.post("/auth/login", json=login_data) + + assert session1_response.status_code == status.HTTP_200_OK + assert session2_response.status_code == status.HTTP_200_OK + + session1_token = session1_response.json()["access_token"] + session2_token = session2_response.json()["access_token"] + + # Both tokens should work + headers1 = {"Authorization": f"Bearer {session1_token}"} + headers2 = {"Authorization": f"Bearer {session2_token}"} + + response1 = client.get("/users/me", headers=headers1) + response2 = client.get("/users/me", headers=headers2) + + assert response1.status_code == status.HTTP_200_OK + assert response2.status_code == status.HTTP_200_OK + +# ================================================================ +# CONFIGURATION TESTS +# ================================================================ + +class TestConfiguration: + """Test configuration and environment-dependent behavior""" + + def test_password_requirements_configurable(self): + """Test that password requirements respect configuration""" + # Test with different configurations + with patch('app.core.config.settings.PASSWORD_MIN_LENGTH', 12): + assert not SecurityManager.validate_password("Short1") # 6 chars + assert SecurityManager.validate_password("LongerPassword123") # 17 chars + + with patch('app.core.config.settings.PASSWORD_REQUIRE_SYMBOLS', True): + assert not SecurityManager.validate_password("NoSymbols123") + assert SecurityManager.validate_password("WithSymbol123!") + + def test_jwt_expiration_configurable(self): + """Test that JWT expiration respects configuration""" + user_data = {"user_id": str(uuid.uuid4()), "email": "test@bakery.es"} + + with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 60): + token = SecurityManager.create_access_token(user_data) + decoded = SecurityManager.verify_token(token) + + assert decoded is not None + # Check that expiration is approximately 60 minutes from now + exp_time = datetime.fromtimestamp(decoded['exp']) + now = datetime.utcnow() + diff = exp_time - now + + # Should be close to 60 minutes (allow 1 minute tolerance) + assert 59 * 60 <= diff.total_seconds() <= 61 * 60 + +# ================================================================ +# TEST RUNNER AND HELPERS +# ================================================================ + +def run_auth_tests(): + """Convenience function to run all auth tests""" + import subprocess + import sys + + # Run pytest with coverage + cmd = [ + sys.executable, "-m", "pytest", + __file__, + "-v", + "--tb=short", + "--cov=app", + "--cov-report=term-missing", + "--cov-report=html:htmlcov", + "-x" # Stop on first failure + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + print(result.stdout) + if result.stderr: + print("STDERR:", result.stderr) + + return result.returncode == 0 + +if __name__ == "__main__": + # Run tests if script is executed directly + import sys + success = run_auth_tests() + sys.exit(0 if success else 1) + +# ================================================================ +# ADDITIONAL TEST UTILITIES +# ================================================================ + +class AuthTestUtils: + """Utility functions for authentication testing""" + + @staticmethod + def create_test_user_data(email_suffix="", **overrides): + """Create valid test user data with optional customization""" + default_data = { + "email": f"test{email_suffix}@bakery.es", + "password": "TestPassword123", + "full_name": f"Test User{email_suffix}" + } + default_data.update(overrides) + return default_data + + @staticmethod + def extract_token_claims(token): + """Extract claims from JWT token without verification""" + import base64 + import json + + # Split token and decode payload + parts = token.split('.') + if len(parts) != 3: + return None + + # Add padding if needed + payload = parts[1] + padding = 4 - len(payload) % 4 + if padding != 4: + payload += '=' * padding + + try: + decoded_bytes = base64.urlsafe_b64decode(payload) + return json.loads(decoded_bytes) + except Exception: + return None + + @staticmethod + async def create_authenticated_user(client, user_data=None): + """Create and authenticate a user, return user info and tokens""" + if user_data is None: + user_data = AuthTestUtils.create_test_user_data() + + # Register + register_response = client.post("/auth/register", json=user_data) + assert register_response.status_code == status.HTTP_200_OK + + # Login + login_data = { + "email": user_data["email"], + "password": user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + assert login_response.status_code == status.HTTP_200_OK + + return { + "user": register_response.json(), + "tokens": login_response.json(), + "headers": {"Authorization": f"Bearer {login_response.json()['access_token']}"} + } + +# ================================================================ +# PYTEST CONFIGURATION +# ================================================================ + +# pytest configuration for the test file +pytest_plugins = ["pytest_asyncio"] + +# Markers for different test categories +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.auth, +] + +# Test configuration +def pytest_configure(config): + """Configure pytest markers""" + config.addinivalue_line( + "markers", "auth: marks tests as authentication tests" + ) + config.addinivalue_line( + "markers", "integration: marks tests as integration tests" + ) + config.addinivalue_line( + "markers", "performance: marks tests as performance tests" + ) + config.addinivalue_line( + "markers", "security: marks tests as security tests" + ) + diff --git a/services/auth/tests/test_users.py b/services/auth/tests/test_users.py deleted file mode 100644 index cdc29ad2..00000000 --- a/services/auth/tests/test_users.py +++ /dev/null @@ -1,74 +0,0 @@ -# ================================================================ -# services/auth/tests/test_users.py -# ================================================================ -"""User management tests""" - -import pytest -from fastapi.testclient import TestClient -from sqlalchemy.ext.asyncio import AsyncSession - -from app.services.user_service import UserService -from app.services.auth_service import AuthService -from app.schemas.auth import UserRegistration - -@pytest.mark.asyncio -async def test_get_user_by_email(db: AsyncSession): - """Test getting user by email""" - # Create a user first - user_data = UserRegistration( - email="test@bakery.es", - password="TestPass123", - full_name="Test User", - language="es" - ) - created_user = await AuthService.register_user(user_data, db) - - # Get user by email - user = await UserService.get_user_by_email("test@bakery.es", db) - - assert user is not None - assert user.email == "test@bakery.es" - assert str(user.id) == created_user.id - -@pytest.mark.asyncio -async def test_update_user(db: AsyncSession): - """Test updating user""" - # Create a user first - user_data = UserRegistration( - email="test@bakery.es", - password="TestPass123", - full_name="Test User", - language="es" - ) - created_user = await AuthService.register_user(user_data, db) - - # Update user - update_data = { - "full_name": "Updated User", - "phone": "+34987654321" - } - - updated_user = await UserService.update_user(created_user.id, update_data, db) - - assert updated_user.full_name == "Updated User" - assert updated_user.phone == "+34987654321" - -def test_get_current_user_endpoint(client: TestClient, test_user_data): - """Test get current user endpoint""" - # Register and login first - client.post("/auth/register", json=test_user_data) - - login_response = client.post("/auth/login", json={ - "email": test_user_data["email"], - "password": test_user_data["password"] - }) - token = login_response.json()["access_token"] - - # Get current user - headers = {"Authorization": f"Bearer {token}"} - response = client.get("/users/me", headers=headers) - - assert response.status_code == 200 - data = response.json() - assert data["email"] == test_user_data["email"] - assert data["full_name"] == test_user_data["full_name"] \ No newline at end of file