diff --git a/services/auth/tests/conftest.py b/services/auth/tests/conftest.py index 40481593..bfc36e1e 100644 --- a/services/auth/tests/conftest.py +++ b/services/auth/tests/conftest.py @@ -1,11 +1,3 @@ -# ================================================================ -# services/auth/tests/conftest.py -# Pytest configuration and shared fixtures for auth service tests -# ================================================================ -""" -Shared test configuration and fixtures for authentication service tests -""" - import pytest import asyncio import os @@ -43,36 +35,27 @@ async def test_engine(): future=True, pool_pre_ping=True ) - - try: - # Import models and base here to avoid import issues - from shared.database.base import Base - # Create all tables - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - - yield engine - - # Cleanup - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - except ImportError: - # If shared.database.base is not available, create a mock - yield engine - finally: - await engine.dispose() + # Import Base and metadata after engine creation to avoid circular imports + from shared.database.base import Base + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield engine + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await engine.dispose() @pytest.fixture(scope="function") async def test_db(test_engine) -> AsyncGenerator[AsyncSession, None]: """Create a test database session for each test function""" async_session = sessionmaker( - test_engine, - class_=AsyncSession, + test_engine, + class_=AsyncSession, expire_on_commit=False ) - + async with async_session() as session: yield session + await session.rollback() # Rollback after each test to ensure a clean state @pytest.fixture(scope="function") def client(test_db): @@ -80,408 +63,94 @@ def client(test_db): try: from app.main import app from app.core.database import get_db - + def override_get_db(): - return test_db - + # test_db is already an AsyncSession yielded by the fixture + yield test_db + app.dependency_overrides[get_db] = override_get_db - + with TestClient(app) as test_client: yield test_client - + # Clean up overrides app.dependency_overrides.clear() except ImportError as e: - pytest.skip(f"Cannot import app modules: {e}") + pytest.skip(f"Cannot import app modules: {e}. Ensure app.main and app.core.database are accessible.") -# ================================================================ -# MOCK FIXTURES -# ================================================================ - -@pytest.fixture -def mock_redis(): - """Mock Redis client for testing rate limiting and session management""" - redis_mock = AsyncMock() - - # Default return values for common operations - redis_mock.get.return_value = None - redis_mock.incr.return_value = 1 - redis_mock.expire.return_value = True - redis_mock.delete.return_value = True - redis_mock.setex.return_value = True - redis_mock.exists.return_value = False - - return redis_mock - -@pytest.fixture -def mock_rabbitmq(): - """Mock RabbitMQ for testing event publishing""" - rabbitmq_mock = AsyncMock() - - # Mock publisher methods - rabbitmq_mock.publish.return_value = True - rabbitmq_mock.connect.return_value = True - rabbitmq_mock.disconnect.return_value = True - - return rabbitmq_mock - -@pytest.fixture -def mock_external_services(): - """Mock external service calls (tenant service, etc.)""" - with patch('httpx.AsyncClient') as mock_client: - mock_response = AsyncMock() - mock_response.status_code = 200 - mock_response.json.return_value = {"tenants": []} - mock_client.return_value.__aenter__.return_value.get.return_value = mock_response - mock_client.return_value.__aenter__.return_value.post.return_value = mock_response - - yield mock_client - -# ================================================================ -# DATA FIXTURES -# ================================================================ - -@pytest.fixture -def valid_user_data(): - """Valid user registration data""" - return { - "email": "test@bakery.es", - "password": "TestPassword123", - "full_name": "Test User" - } - -@pytest.fixture -def valid_user_data_list(): - """List of valid user data for multiple users""" - return [ - { - "email": f"test{i}@bakery.es", - "password": "TestPassword123", - "full_name": f"Test User {i}" - } - for i in range(1, 6) - ] - -@pytest.fixture -def weak_password_data(): - """User data with various weak passwords""" - return [ - {"email": "weak1@bakery.es", "password": "123", "full_name": "Weak 1"}, - {"email": "weak2@bakery.es", "password": "password", "full_name": "Weak 2"}, - {"email": "weak3@bakery.es", "password": "PASSWORD123", "full_name": "Weak 3"}, - {"email": "weak4@bakery.es", "password": "testpassword", "full_name": "Weak 4"}, - ] - -@pytest.fixture -def invalid_email_data(): - """User data with invalid email formats""" - return [ - {"email": "invalid", "password": "TestPassword123", "full_name": "Invalid 1"}, - {"email": "@bakery.es", "password": "TestPassword123", "full_name": "Invalid 2"}, - {"email": "test@", "password": "TestPassword123", "full_name": "Invalid 3"}, - {"email": "test..test@bakery.es", "password": "TestPassword123", "full_name": "Invalid 4"}, - ] - -# ================================================================ -# USER FIXTURES -# ================================================================ - -@pytest.fixture -async def test_user(test_db, valid_user_data): +@pytest.fixture(scope="function") +async def test_user(test_db): """Create a test user in the database""" try: from app.services.auth_service import AuthService - + from app.schemas.auth import UserRegistration + + user_data = UserRegistration( + email="existing@bakery.es", + password="TestPassword123", + full_name="Existing User" + ) + user = await AuthService.create_user( - email=valid_user_data["email"], - password=valid_user_data["password"], - full_name=valid_user_data["full_name"], + email=user_data.email, + password=user_data.password, + full_name=user_data.full_name, db=test_db ) return user except ImportError: pytest.skip("AuthService not available") -@pytest.fixture -async def test_users(test_db, valid_user_data_list): - """Create multiple test users in the database""" - try: - from app.services.auth_service import AuthService - - users = [] - for user_data in valid_user_data_list: - user = await AuthService.create_user( - email=user_data["email"], - password=user_data["password"], - full_name=user_data["full_name"], - db=test_db - ) - users.append(user) - return users - except ImportError: - pytest.skip("AuthService not available") +@pytest.fixture(scope="function") +async def test_redis_client(): + """Create a test Redis client""" + # Use a mock Redis client for testing + mock_redis = AsyncMock(spec=redis.Redis) + yield mock_redis + await mock_redis.close() -@pytest.fixture -async def authenticated_user(client, valid_user_data): - """Create an authenticated user and return user info, tokens, and headers""" - # Register user - register_response = client.post("/auth/register", json=valid_user_data) - assert register_response.status_code == 200 - - # Login user - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } - login_response = client.post("/auth/login", json=login_data) - assert login_response.status_code == 200 - - token_data = login_response.json() - +# ================================================================\ +# TEST HELPERS +# ================================================================\ + +import uuid # Moved from test_auth_comprehensive.py as it's a shared helper + +def generate_random_user_data(prefix="test"): + """Generates unique user data for testing.""" + unique_id = uuid.uuid4().hex[:8] return { - "user": register_response.json(), - "tokens": token_data, - "access_token": token_data["access_token"], - "refresh_token": token_data["refresh_token"], - "headers": {"Authorization": f"Bearer {token_data['access_token']}"} + "email": f"{prefix}_{unique_id}@bakery.es", + "password": f"StrongPwd{unique_id}!", + "full_name": f"Test User {unique_id}" } -# ================================================================ -# CONFIGURATION FIXTURES -# ================================================================ +# ================================================================\ +# PYTEST HOOKS +# ================================================================\ -@pytest.fixture -def test_settings(): - """Test-specific settings override""" - try: - from app.core.config import settings - - original_settings = {} - - # Store original values - test_overrides = { - 'JWT_ACCESS_TOKEN_EXPIRE_MINUTES': 30, - 'JWT_REFRESH_TOKEN_EXPIRE_DAYS': 7, - 'PASSWORD_MIN_LENGTH': 8, - 'PASSWORD_REQUIRE_UPPERCASE': True, - 'PASSWORD_REQUIRE_LOWERCASE': True, - 'PASSWORD_REQUIRE_NUMBERS': True, - 'PASSWORD_REQUIRE_SYMBOLS': False, - 'MAX_LOGIN_ATTEMPTS': 5, - 'LOCKOUT_DURATION_MINUTES': 30, - 'BCRYPT_ROUNDS': 4, # Lower for faster tests - } - - for key, value in test_overrides.items(): - if hasattr(settings, key): - original_settings[key] = getattr(settings, key) - setattr(settings, key, value) - - yield settings - - # Restore original values - for key, value in original_settings.items(): - setattr(settings, key, value) - except ImportError: - pytest.skip("Settings not available") - -# ================================================================ -# PATCHING FIXTURES -# ================================================================ - -@pytest.fixture -def patch_redis(mock_redis): - """Patch Redis client for all tests""" - with patch('app.core.security.redis_client', mock_redis): - yield mock_redis - -@pytest.fixture -def patch_messaging(mock_rabbitmq): - """Patch messaging system for all tests""" - with patch('app.services.messaging.publisher', mock_rabbitmq): - yield mock_rabbitmq - -@pytest.fixture -def patch_external_apis(mock_external_services): - """Patch external API calls""" - yield mock_external_services - -# ================================================================ -# UTILITY FIXTURES -# ================================================================ - -@pytest.fixture -def auth_headers(): - """Factory for creating authorization headers""" - def _create_headers(token): - return {"Authorization": f"Bearer {token}"} - return _create_headers - -@pytest.fixture -def password_generator(): - """Generate passwords with different characteristics""" - def _generate( - length=12, - include_upper=True, - include_lower=True, - include_numbers=True, - include_symbols=False - ): - import random - import string - - chars = "" - password = "" - - if include_lower: - chars += string.ascii_lowercase - password += random.choice(string.ascii_lowercase) - - if include_upper: - chars += string.ascii_uppercase - password += random.choice(string.ascii_uppercase) - - if include_numbers: - chars += string.digits - password += random.choice(string.digits) - - if include_symbols: - chars += "!@#$%^&*" - password += random.choice("!@#$%^&*") - - # Fill remaining length - remaining = length - len(password) - if remaining > 0: - password += ''.join(random.choice(chars) for _ in range(remaining)) - - # Shuffle the password - password_list = list(password) - random.shuffle(password_list) - return ''.join(password_list) - - return _generate - -# ================================================================ -# PERFORMANCE TESTING FIXTURES -# ================================================================ - -@pytest.fixture -def performance_timer(): - """Timer utility for performance testing""" - import time - - class Timer: - def __init__(self): - self.start_time = None - self.end_time = None - - def start(self): - self.start_time = time.time() - - def stop(self): - self.end_time = time.time() - - @property - def elapsed(self): - if self.start_time and self.end_time: - return self.end_time - self.start_time - return None - - def __enter__(self): - self.start() - return self - - def __exit__(self, *args): - self.stop() - - return Timer - -# ================================================================ -# DATABASE UTILITY FIXTURES -# ================================================================ - -@pytest.fixture -async def db_utils(test_db): - """Database utility functions for testing""" - class DBUtils: - def __init__(self, db): - self.db = db - - async def count_users(self): - try: - from sqlalchemy import select, func - from app.models.users import User - result = await self.db.execute(select(func.count(User.id))) - return result.scalar() - except ImportError: - return 0 - - async def get_user_by_email(self, email): - try: - from sqlalchemy import select - from app.models.users import User - result = await self.db.execute(select(User).where(User.email == email)) - return result.scalar_one_or_none() - except ImportError: - return None - - async def count_refresh_tokens(self): - try: - from sqlalchemy import select, func - from app.models.users import RefreshToken - result = await self.db.execute(select(func.count(RefreshToken.id))) - return result.scalar() - except ImportError: - return 0 - - async def clear_all_data(self): - try: - from app.models.users import User, RefreshToken - await self.db.execute(RefreshToken.__table__.delete()) - await self.db.execute(User.__table__.delete()) - await self.db.commit() - except ImportError: - pass - - return DBUtils(test_db) - -# ================================================================ -# LOGGING FIXTURES -# ================================================================ - -@pytest.fixture -def capture_logs(): - """Capture logs for testing""" - import logging - from io import StringIO - - log_capture = StringIO() - handler = logging.StreamHandler(log_capture) - handler.setLevel(logging.DEBUG) - - # Add handler to auth service loggers - loggers = [ - logging.getLogger('app.services.auth_service'), - logging.getLogger('app.core.security'), - logging.getLogger('app.api.auth'), - ] - - for logger in loggers: - logger.addHandler(handler) - logger.setLevel(logging.DEBUG) - - yield log_capture - - # Clean up - for logger in loggers: - logger.removeHandler(handler) - -# ================================================================ -# TEST MARKERS AND CONFIGURATION -# ================================================================ +def pytest_addoption(parser): + """Add custom options to pytest""" + parser.addoption( + "--integration", action="store_true", default=False, help="run integration tests" + ) + parser.addoption( + "--api", action="store_true", default=False, help="run API tests" + ) + parser.addoption( + "--security", action="store_true", default=False, help="run security tests" + ) + parser.addoption( + "--performance", action="store_true", default=False, help="run performance tests" + ) + parser.addoption( + "--slow", action="store_true", default=False, help="run slow tests" + ) + parser.addoption( + "--auth", action="store_true", default=False, help="run authentication tests" + ) def pytest_configure(config): - """Configure pytest with custom markers""" + """Configure pytest markers""" config.addinivalue_line( "markers", "unit: marks tests as unit tests" ) @@ -522,7 +191,35 @@ def pytest_collection_modifyitems(config, items): item.add_marker(pytest.mark.integration) if "Flow" in str(item.cls) or "flow" in item.name.lower(): - item.add_marker(pytest.mark.integration) - - if "auth" in item.name.lower() or "Auth" in str(item.cls): - item.add_marker(pytest.mark.auth) \ No newline at end of file + item.add_marker(pytest.mark.integration) # Authentication flows are integration tests + + # Mark all tests in test_auth_comprehensive.py with 'auth' + if "test_auth_comprehensive" in str(item.fspath): + item.add_marker(pytest.mark.auth) + + # Filtering logic for command line options + if not any([config.getoption("--integration"), config.getoption("--api"), + config.getoption("--security"), config.getoption("--performance"), + config.getoption("--slow"), config.getoption("--auth")]): + return # No specific filter applied, run all collected tests + + skip_markers = [] + if not config.getoption("--integration"): + skip_markers.append(pytest.mark.integration) + if not config.getoption("--api"): + skip_markers.append(pytest.mark.api) + if not config.getoption("--security"): + skip_markers.append(pytest.mark.security) + if not config.getoption("--performance"): + skip_markers.append(pytest.mark.performance) + if not config.getoption("--slow"): + skip_markers.append(pytest.mark.slow) + if not config.getoption("--auth"): + skip_markers.append(pytest.mark.auth) + + # Remove tests with any of the skip markers + if skip_markers: + for item in list(items): # Iterate over a copy to allow modification + if any(marker in item.iter_markers() for marker in skip_markers): + items.remove(item) + item.add_marker(pytest.mark.skip(reason="filtered by command line option")) \ No newline at end of file diff --git a/services/auth/tests/test_auth_comprehensive.py b/services/auth/tests/test_auth_comprehensive.py index 5f30023e..6be74aad 100644 --- a/services/auth/tests/test_auth_comprehensive.py +++ b/services/auth/tests/test_auth_comprehensive.py @@ -1,16 +1,9 @@ -# ================================================================ -# services/auth/tests/test_auth_comprehensive.py -# Complete pytest test suite for authentication service -# ================================================================ -""" -Comprehensive test suite for the authentication service -Tests registration, login, and all authentication-related functionality -""" - import pytest import asyncio import uuid +import time # Added for performance tests from datetime import datetime, timedelta, timezone +from typing import AsyncGenerator # Added to fix NameError from unittest.mock import AsyncMock, MagicMock, patch from fastapi.testclient import TestClient from fastapi import status @@ -38,1337 +31,1186 @@ TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" @pytest.fixture(scope="function") async def test_engine(): - """Create test database engine""" + """Create a test database engine for each test function""" engine = create_async_engine( TEST_DATABASE_URL, - echo=False, - future=True + echo=False, # Set to True for SQL debugging + future=True, + pool_pre_ping=True ) - - # Create all tables async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) - yield engine - - # Clean up + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) await engine.dispose() @pytest.fixture(scope="function") -async def test_db(test_engine): - """Create test database session""" +async def test_db(test_engine) -> AsyncGenerator[AsyncSession, None]: + """Create a test database session for each test function""" async_session = sessionmaker( - test_engine, class_=AsyncSession, expire_on_commit=False + test_engine, + class_=AsyncSession, + expire_on_commit=False ) - + async with async_session() as session: yield session + await session.rollback() # Rollback after each test to ensure a clean state @pytest.fixture(scope="function") def client(test_db): - """Create test client with database dependency override""" - - def override_get_db(): - return test_db - - app.dependency_overrides[get_db] = override_get_db - - with TestClient(app) as test_client: - yield test_client - - # Clean up - app.dependency_overrides.clear() + """Create a test client with database dependency override""" + try: + from app.main import app + from app.core.database import get_db -@pytest.fixture -def mock_redis(): - """Mock Redis client for testing""" - redis_mock = AsyncMock() - redis_mock.get.return_value = None - redis_mock.incr.return_value = 1 - redis_mock.expire.return_value = True - redis_mock.delete.return_value = True - redis_mock.setex.return_value = True - return redis_mock + def override_get_db(): + # test_db is already an AsyncSession yielded by the fixture + yield test_db -@pytest.fixture -def valid_user_data(): - """Valid user registration data""" - return { - "email": "test@bakery.es", - "password": "TestPassword123", - "full_name": "Test User" - } + app.dependency_overrides[get_db] = override_get_db -@pytest.fixture -def weak_password_user_data(): - """User data with weak password""" - return { - "email": "test@bakery.es", - "password": "weak", - "full_name": "Test User" - } + with TestClient(app) as test_client: + yield test_client -@pytest.fixture -def invalid_email_user_data(): - """User data with invalid email""" - return { - "email": "invalid-email", - "password": "TestPassword123", - "full_name": "Test User" - } + # Clean up overrides + app.dependency_overrides.clear() + except ImportError as e: + pytest.skip(f"Cannot import app modules: {e}. Ensure app.main and app.core.database are accessible.") -@pytest.fixture + +@pytest.fixture(scope="function") async def test_user(test_db): """Create a test user in the database""" - user_data = UserRegistration( - email="existing@bakery.es", - password="TestPassword123", - full_name="Existing User" - ) - + user_data = { + "email": "existing@bakery.es", + "password": "TestPassword123!", + "full_name": "Existing User" + } + user = await AuthService.create_user( - email=user_data.email, - password=user_data.password, - full_name=user_data.full_name, + email=user_data["email"], + password=user_data["password"], + full_name=user_data["full_name"], db=test_db ) return user -# ================================================================ -# SECURITY MANAGER TESTS -# ================================================================ +@pytest.fixture(scope="function") +async def test_redis_client(): + """Create a test Redis client""" + mock_redis = AsyncMock(spec=redis.Redis) + yield mock_redis + await mock_redis.close() -class TestSecurityManager: - """Test the SecurityManager utility class""" + +# ================================================================\ +# HELPER FUNCTIONS +# ================================================================\ + +# Moved generate_random_user_data to conftest.py as it's a shared helper. +# It's imported implicitly via conftest.py if setup correctly. +# If not, add explicit import: from conftest import generate_random_user_data + +async def register_and_login_user(client: TestClient): + """Helper to register and login a user, returning user data and tokens.""" + user_data = generate_random_user_data() # Ensure this function is accessible - def test_hash_password(self): - """Test password hashing""" - password = "TestPassword123" - hashed = SecurityManager.hash_password(password) - - assert hashed != password - assert len(hashed) > 50 # bcrypt hashes are long - assert hashed.startswith('$2b$') # bcrypt format + # Register + register_response = client.post("/auth/register", json=user_data) + assert register_response.status_code == status.HTTP_200_OK + + # Login + login_data = { + "email": user_data["email"], + "password": user_data["password"] + } + login_response = client.post("/auth/login", json=login_data) + assert login_response.status_code == status.HTTP_200_OK + + return { + "user_data": user_data, # Return original user_data for password access + "user_response": register_response.json(), # Return user info from register + "tokens": login_response.json(), + "headers": {"Authorization": f"Bearer {login_response.json()['access_token']}"} + } - def test_verify_password_correct(self): - """Test password verification with correct password""" - password = "TestPassword123" - hashed = SecurityManager.hash_password(password) - - assert SecurityManager.verify_password(password, hashed) is True - def test_verify_password_incorrect(self): - """Test password verification with incorrect password""" - password = "TestPassword123" - wrong_password = "WrongPassword123" - hashed = SecurityManager.hash_password(password) - - assert SecurityManager.verify_password(wrong_password, hashed) is False - - @pytest.mark.parametrize("password,expected", [ - ("TestPassword123", True), # Valid password - ("TestPwd123", True), # Valid but shorter - ("testpassword123", False), # No uppercase - ("TESTPASSWORD123", False), # No lowercase - ("TestPassword", False), # No numbers - ("Test123", False), # Too short - ("", False), # Empty - ]) - def test_validate_password(self, password, expected): - """Test password validation with various inputs""" - assert SecurityManager.validate_password(password) == expected - - def test_create_access_token(self): - """Test JWT access token creation""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es", - "full_name": "Test User" - } - - token = SecurityManager.create_access_token(user_data) - - assert isinstance(token, str) - assert len(token) > 100 # JWT tokens are long - - # Verify token can be decoded - decoded = SecurityManager.verify_token(token) - assert decoded is not None - assert decoded["email"] == user_data["email"] - - def test_create_refresh_token(self): - """Test JWT refresh token creation""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es" - } - - token = SecurityManager.create_refresh_token(user_data) - - assert isinstance(token, str) - assert len(token) > 100 - - def test_verify_token_valid(self): - """Test JWT token verification with valid token""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es" - } - - token = SecurityManager.create_access_token(user_data) - decoded = SecurityManager.verify_token(token) - - assert decoded is not None - assert decoded["user_id"] == user_data["user_id"] - assert decoded["email"] == user_data["email"] - - def test_verify_token_invalid(self): - """Test JWT token verification with invalid token""" - invalid_token = "invalid.jwt.token" - decoded = SecurityManager.verify_token(invalid_token) - - assert decoded is None - - def test_hash_token(self): - """Test token hashing for storage""" - token = "test_token_string" - hashed = SecurityManager.hash_token(token) - - assert hashed != token - assert len(hashed) == 64 # SHA256 hex digest length - - @pytest.mark.asyncio - async def test_check_login_attempts_no_attempts(self, mock_redis): - """Test checking login attempts when none exist""" - with patch('app.core.security.redis_client', mock_redis): - mock_redis.get.return_value = None - - result = await SecurityManager.check_login_attempts("test@bakery.es") - assert result is True - - @pytest.mark.asyncio - async def test_check_login_attempts_under_limit(self, mock_redis): - """Test checking login attempts under the limit""" - with patch('app.core.security.redis_client', mock_redis): - mock_redis.get.return_value = "3" # Under limit of 5 - - result = await SecurityManager.check_login_attempts("test@bakery.es") - assert result is True - - @pytest.mark.asyncio - async def test_check_login_attempts_over_limit(self, mock_redis): - """Test checking login attempts over the limit""" - with patch('app.core.security.redis_client', mock_redis): - mock_redis.get.return_value = "5" # At limit - - result = await SecurityManager.check_login_attempts("test@bakery.es") - assert result is False - - @pytest.mark.asyncio - async def test_increment_login_attempts(self, mock_redis): - """Test incrementing login attempts""" - with patch('app.core.security.redis_client', mock_redis): - await SecurityManager.increment_login_attempts("test@bakery.es") - - mock_redis.incr.assert_called_once() - mock_redis.expire.assert_called_once() - - @pytest.mark.asyncio - async def test_clear_login_attempts(self, mock_redis): - """Test clearing login attempts""" - with patch('app.core.security.redis_client', mock_redis): - await SecurityManager.clear_login_attempts("test@bakery.es") - - mock_redis.delete.assert_called_once() - -# ================================================================ -# AUTH SERVICE TESTS -# ================================================================ +# ================================================================\ +# TEST SUITE +# ================================================================\ class TestAuthService: - """Test the AuthService business logic""" - + """Comprehensive unit tests for AuthService""" + @pytest.mark.asyncio async def test_create_user_success(self, test_db): """Test successful user creation""" - email = "newuser@bakery.es" - password = "TestPassword123" - full_name = "New User" - - user = await AuthService.create_user(email, password, full_name, test_db) - - assert user.email == email - assert user.full_name == full_name + user = await AuthService.create_user( + "test_new@bakery.es", "Password123!", "New User", test_db + ) + assert user is not None + assert user.email == "test_new@bakery.es" + assert SecurityManager.verify_password("Password123!", user.hashed_password) assert user.is_active is True - assert user.is_verified is False - assert user.id is not None - assert user.created_at is not None - - # Verify password was hashed - assert user.hashed_password != password - assert SecurityManager.verify_password(password, user.hashed_password) @pytest.mark.asyncio - async def test_create_user_duplicate_email(self, test_db, test_user): - """Test user creation with duplicate email""" - email = test_user.email # Use existing user's email - password = "TestPassword123" - full_name = "Duplicate User" - - with pytest.raises(Exception) as exc_info: - await AuthService.create_user(email, password, full_name, test_db) - - # Should raise HTTPException for duplicate email - assert "already registered" in str(exc_info.value).lower() + async def test_create_user_duplicate_email(self, test_user, test_db): + """Test creating a user with a duplicate email""" + # test_user is already awaited by pytest-asyncio + with pytest.raises(ValueError, match="Email already registered"): + await AuthService.create_user( + test_user.email, "AnotherPassword!", "Duplicate User", test_db + ) @pytest.mark.asyncio - async def test_authenticate_user_success(self, test_db, test_user): + async def test_authenticate_user_success(self, test_user, test_db): """Test successful user authentication""" - email = test_user.email - password = "TestPassword123" # Original password - - authenticated_user = await AuthService.authenticate_user(email, password, test_db) - + # test_user is already awaited by pytest-asyncio + authenticated_user = await AuthService.authenticate_user( + test_user.email, "TestPassword123!", test_db + ) assert authenticated_user is not None assert authenticated_user.id == test_user.id - assert authenticated_user.email == email + assert authenticated_user.email == test_user.email @pytest.mark.asyncio - async def test_authenticate_user_wrong_password(self, test_db, test_user): + async def test_authenticate_user_wrong_password(self, test_user, test_db): """Test authentication with wrong password""" - email = test_user.email - wrong_password = "WrongPassword123" - - authenticated_user = await AuthService.authenticate_user(email, wrong_password, test_db) - + # test_user is already awaited by pytest-asyncio + authenticated_user = await AuthService.authenticate_user( + test_user.email, "WrongPassword", test_db + ) assert authenticated_user is None @pytest.mark.asyncio async def test_authenticate_user_nonexistent(self, test_db): - """Test authentication with non-existent email""" - email = "nonexistent@bakery.es" - password = "TestPassword123" - - authenticated_user = await AuthService.authenticate_user(email, password, test_db) - - assert authenticated_user is None - - @pytest.mark.asyncio - async def test_authenticate_user_inactive(self, test_db): - """Test authentication with inactive user""" - # Create inactive user - user = await AuthService.create_user( - "inactive@bakery.es", - "TestPassword123", - "Inactive User", - test_db - ) - - # Make user inactive - user.is_active = False - await test_db.commit() - + """Test authentication of a nonexistent user""" authenticated_user = await AuthService.authenticate_user( - "inactive@bakery.es", - "TestPassword123", - test_db + "nonexistent@bakery.es", "AnyPassword", test_db ) - assert authenticated_user is None @pytest.mark.asyncio - async def test_login_user_success(self, test_db, test_user, mock_redis): - """Test successful login""" - with patch('app.core.security.redis_client', mock_redis): - with patch('app.services.auth_service.AuthService._get_user_tenants') as mock_tenants: - mock_tenants.return_value = [] - - result = await AuthService.login( - test_user.email, - "TestPassword123", - test_db - ) - - assert isinstance(result, dict) - assert "access_token" in result - assert "refresh_token" in result - assert "user" in result - assert result["user"]["email"] == test_user.email + async def test_authenticate_user_inactive(self, test_user, test_db): + """Test authentication of an inactive user""" + # test_user is already awaited by pytest-asyncio + test_user.is_active = False + await test_db.commit() # Commit the change to the database + authenticated_user = await AuthService.authenticate_user( + test_user.email, "TestPassword123!", test_db + ) + assert authenticated_user is None + test_user.is_active = True # Reset for other tests if needed + await test_db.commit() # Commit the reset @pytest.mark.asyncio - async def test_login_user_invalid_credentials(self, test_db, test_user): - """Test login with invalid credentials""" - with pytest.raises(Exception) as exc_info: - await AuthService.login( - test_user.email, - "WrongPassword123", - test_db - ) - - assert "invalid credentials" in str(exc_info.value).lower() + async def test_login_user_success(self, test_user, test_db): + """Test successful user login, including token generation""" + tokens = await AuthService.login_user( # Assuming AuthService.login_user is a class method + test_user.email, "TestPassword123!", test_db + ) + assert tokens.access_token is not None + assert tokens.refresh_token is not None + # Verify access token claims + decoded_access_token = SecurityManager.verify_token(tokens.access_token) + assert decoded_access_token["sub"] == str(test_user.id) + assert decoded_access_token["email"] == test_user.email + assert decoded_access_token["full_name"] == test_user.full_name + + # Verify refresh token is stored (mocked or actual db check) + refresh_token_db = await test_db.execute( + text("SELECT * FROM refresh_tokens WHERE user_id = :user_id"), + {"user_id": test_user.id} + ) + refresh_token_db_obj = refresh_token_db.scalar_one_or_none() + assert refresh_token_db_obj is not None + assert SecurityManager.verify_password(tokens.refresh_token, refresh_token_db_obj.hashed_token) + + @pytest.mark.asyncio + async def test_login_user_invalid_credentials(self, test_db): + """Test login with invalid credentials""" + with pytest.raises(ValueError, match="Invalid credentials"): + await AuthService.login_user("nonexistent@bakery.es", "WrongPassword", test_db) # Assuming AuthService.login_user is a class method + + @pytest.mark.asyncio + async def test_refresh_access_token_success(self, test_user, test_db): + """Test successful access token refresh""" + initial_tokens = await AuthService.login_user(test_user.email, "TestPassword123!", test_db) # Assuming AuthService.login_user is a class method + new_tokens = await AuthService.refresh_access_token(initial_tokens.refresh_token, test_db) # Assuming AuthService.refresh_access_token is a class method + + assert new_tokens.access_token is not None + assert new_tokens.refresh_token == initial_tokens.refresh_token # Refresh token typically remains the same + + decoded_access_token = SecurityManager.verify_token(new_tokens.access_token) + assert decoded_access_token["sub"] == str(test_user.id) + assert decoded_access_token["email"] == test_user.email + + @pytest.mark.asyncio + async def test_refresh_access_token_invalid(self, test_db): + """Test refresh with an invalid refresh token""" + with pytest.raises(ValueError, match="Invalid refresh token"): + await AuthService.refresh_access_token("invalid_refresh_token", test_db) # Assuming AuthService.refresh_access_token is a class method + + @pytest.mark.asyncio + async def test_logout_user_success(self, test_user, test_db): + """Test successful user logout""" + initial_tokens = await AuthService.login_user(test_user.email, "TestPassword123!", test_db) # Assuming AuthService.login_user is a class method + assert await AuthService.logout_user(initial_tokens.refresh_token, test_db) is True # Assuming AuthService.logout_user is a class method + + refresh_token_db = await test_db.execute( + text("SELECT * FROM refresh_tokens WHERE user_id = :user_id"), + {"user_id": test_user.id} + ) + assert refresh_token_db.scalar_one_or_none() is None # Token should be deleted + + @pytest.mark.asyncio + async def test_logout_user_invalid(self, test_db): + """Test logout with an invalid refresh token""" + with pytest.raises(ValueError, match="Invalid refresh token"): + await AuthService.logout_user("nonexistent_refresh_token", test_db) # Assuming AuthService.logout_user is a class method -# ================================================================ -# API ENDPOINT TESTS -# ================================================================ class TestAuthenticationAPI: - """Test the authentication API endpoints""" - - def test_register_success(self, client, valid_user_data): + """Integration tests for authentication API endpoints""" + + @pytest.mark.api + async def test_register_success(self, client): """Test successful user registration via API""" - response = client.post("/auth/register", json=valid_user_data) - + user_data = generate_random_user_data() + response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_200_OK - - data = response.json() - assert data["email"] == valid_user_data["email"] - assert data["full_name"] == valid_user_data["full_name"] - assert data["is_active"] is True - assert data["is_verified"] is False - assert "id" in data - assert "created_at" in data + assert "message" in response.json() + assert "User registered successfully" in response.json()["message"] + # Optionally, verify user exists in DB - def test_register_weak_password(self, client, weak_password_user_data): - """Test registration with weak password""" - response = client.post("/auth/register", json=weak_password_user_data) - + @pytest.mark.api + async def test_register_weak_password(self, client): + """Test registration with a weak password""" + user_data = generate_random_user_data() + user_data["password"] = "short" # Weak password + response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_400_BAD_REQUEST - - data = response.json() - assert "password" in data["detail"].lower() + assert "password" in response.json()["detail"].lower() - def test_register_invalid_email(self, client, invalid_email_user_data): - """Test registration with invalid email""" - response = client.post("/auth/register", json=invalid_email_user_data) - + @pytest.mark.api + async def test_register_invalid_email(self, client): + """Test registration with an invalid email format""" + user_data = generate_random_user_data() + user_data["email"] = "invalid-email" + response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "email" in response.json()["detail"][0]["loc"] - def test_register_missing_fields(self, client): + @pytest.mark.api + async def test_register_missing_fields(self, client): """Test registration with missing required fields""" - incomplete_data = { - "email": "test@bakery.es" - # Missing password and full_name - } - - response = client.post("/auth/register", json=incomplete_data) - + user_data = {"email": "test@bakery.es", "password": "TestPassword123!"} # Missing full_name + response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "full_name" in response.json()["detail"][0]["loc"] - def test_register_duplicate_email(self, client, valid_user_data): - """Test registration with duplicate email""" - # First registration - response1 = client.post("/auth/register", json=valid_user_data) - assert response1.status_code == status.HTTP_200_OK - - # Second registration with same email - response2 = client.post("/auth/register", json=valid_user_data) - assert response2.status_code == status.HTTP_400_BAD_REQUEST - - data = response2.json() - assert "already registered" in data["detail"].lower() + @pytest.mark.api + async def test_register_duplicate_email(self, client): + """Test registration with an email that is already registered""" + user_data = generate_random_user_data() + client.post("/auth/register", json=user_data) # First registration + response = client.post("/auth/register", json=user_data) # Duplicate registration + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "Email already registered" in response.json()["detail"] - def test_login_success(self, client, valid_user_data): - """Test successful login via API""" - # First register a user - client.post("/auth/register", json=valid_user_data) + @pytest.mark.api + async def test_login_success(self, client): + """Test successful user login via API""" + user_data = generate_random_user_data() + client.post("/auth/register", json=user_data) # Register first - # Then login - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } - - response = client.post("/auth/login", json=login_data) - - assert response.status_code == status.HTTP_200_OK - - data = response.json() - assert "access_token" in data - assert "refresh_token" in data - assert data["token_type"] == "bearer" - assert "expires_in" in data - assert "user" in data - assert data["user"]["email"] == valid_user_data["email"] + login_response = client.post("/auth/login", json={ + "email": user_data["email"], + "password": user_data["password"] + }) + assert login_response.status_code == status.HTTP_200_OK + assert "access_token" in login_response.json() + assert "refresh_token" in login_response.json() - def test_login_wrong_password(self, client, valid_user_data): - """Test login with wrong password""" - # First register a user - client.post("/auth/register", json=valid_user_data) + @pytest.mark.api + async def test_login_wrong_password(self, client): + """Test login with wrong password via API""" + user_data = generate_random_user_data() + client.post("/auth/register", json=user_data) - # Then login with wrong password - login_data = { - "email": valid_user_data["email"], - "password": "WrongPassword123" - } - - response = client.post("/auth/login", json=login_data) - - assert response.status_code == status.HTTP_401_UNAUTHORIZED - - data = response.json() - assert "invalid credentials" in data["detail"].lower() + login_response = client.post("/auth/login", json={ + "email": user_data["email"], + "password": "WrongPassword" + }) + assert login_response.status_code == status.HTTP_401_UNAUTHORIZED + assert "Invalid credentials" in login_response.json()["detail"] - def test_login_nonexistent_user(self, client): - """Test login with non-existent user""" - login_data = { + @pytest.mark.api + async def test_login_nonexistent_user(self, client): + """Test login for a nonexistent user via API""" + login_response = client.post("/auth/login", json={ "email": "nonexistent@bakery.es", - "password": "TestPassword123" - } - - response = client.post("/auth/login", json=login_data) - - assert response.status_code == status.HTTP_401_UNAUTHORIZED + "password": "AnyPassword123!" + }) + assert login_response.status_code == status.HTTP_401_UNAUTHORIZED + assert "Invalid credentials" in login_response.json()["detail"] - def test_login_missing_fields(self, client): + @pytest.mark.api + async def test_login_missing_fields(self, client): """Test login with missing fields""" - incomplete_data = { - "email": "test@bakery.es" - # Missing password - } - - response = client.post("/auth/login", json=incomplete_data) - + response = client.post("/auth/login", json={"email": "test@bakery.es"}) # Missing password assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "password" in response.json()["detail"][0]["loc"] - def test_login_invalid_email_format(self, client): + @pytest.mark.api + async def test_login_invalid_email_format(self, client): """Test login with invalid email format""" - login_data = { - "email": "invalid-email", - "password": "TestPassword123" - } - - response = client.post("/auth/login", json=login_data) - - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + user_data = generate_random_user_data() + client.post("/auth/register", json=user_data) + + response = client.post("/auth/login", json={ + "email": "invalid-email-format", + "password": user_data["password"] + }) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "email" in response.json()["detail"][0]["loc"] + + @pytest.mark.api + async def test_token_refresh(self, client): + """Test refreshing an access token via API""" + auth_data = await register_and_login_user(client) + refresh_token = auth_data["tokens"]["refresh_token"] + + refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert refresh_response.status_code == status.HTTP_200_OK + assert "access_token" in refresh_response.json() + assert "refresh_token" in refresh_response.json() # Refresh token usually doesn't change on refresh + assert refresh_response.json()["refresh_token"] == refresh_token + + @pytest.mark.api + async def test_logout(self, client): + """Test user logout via API""" + auth_data = await register_and_login_user(client) + refresh_token = auth_data["tokens"]["refresh_token"] + + logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token}) + assert logout_response.status_code == status.HTTP_200_OK + assert "message" in logout_response.json() + assert "User logged out successfully" in logout_response.json()["message"] + + # Attempt to refresh with the logged-out token + refresh_response_after_logout = client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert refresh_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED + assert "Invalid refresh token" in refresh_response_after_logout.json()["detail"] -# ================================================================ -# INTEGRATION TESTS -# ================================================================ class TestAuthenticationFlow: - """Test complete authentication flows""" - - def test_complete_registration_login_flow(self, client, valid_user_data): - """Test complete flow: register -> login -> access protected endpoint""" - # Step 1: Register - register_response = client.post("/auth/register", json=valid_user_data) + """End-to-end tests for complete authentication flows""" + + @pytest.mark.integration + async def test_complete_registration_login_flow(self, client): + """Test a complete flow from registration to login""" + user_data = generate_random_user_data() + + # 1. Register + register_response = client.post("/auth/register", json=user_data) assert register_response.status_code == status.HTTP_200_OK - - user_data = register_response.json() - user_id = user_data["id"] - - # Step 2: Login - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } + assert "User registered successfully" in register_response.json()["message"] + + # 2. Login + login_data = {"email": user_data["email"], "password": user_data["password"]} login_response = client.post("/auth/login", json=login_data) assert login_response.status_code == status.HTTP_200_OK - - token_data = login_response.json() - access_token = token_data["access_token"] - - # Step 3: Access protected endpoint - headers = {"Authorization": f"Bearer {access_token}"} - profile_response = client.get("/users/me", headers=headers) - assert profile_response.status_code == status.HTTP_200_OK - - profile_data = profile_response.json() - assert profile_data["id"] == user_id - assert profile_data["email"] == valid_user_data["email"] + assert "access_token" in login_response.json() + assert "refresh_token" in login_response.json() + access_token = login_response.json()["access_token"] + refresh_token = login_response.json()["refresh_token"] - def test_token_refresh_flow(self, client, valid_user_data): - """Test token refresh flow""" - # Register and login - client.post("/auth/register", json=valid_user_data) - - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } - login_response = client.post("/auth/login", json=login_data) - token_data = login_response.json() - - refresh_token = token_data["refresh_token"] - - # Refresh token - refresh_data = {"refresh_token": refresh_token} - refresh_response = client.post("/auth/refresh", json=refresh_data) - + # 3. Access protected endpoint (e.g., /users/me) + me_response = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"}) + assert me_response.status_code == status.HTTP_200_OK + assert me_response.json()["email"] == user_data["email"] + + # 4. Refresh token + refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == status.HTTP_200_OK - - new_token_data = refresh_response.json() - assert "access_token" in new_token_data - assert "refresh_token" in new_token_data - - # Verify new token works - headers = {"Authorization": f"Bearer {new_token_data['access_token']}"} - profile_response = client.get("/users/me", headers=headers) - assert profile_response.status_code == status.HTTP_200_OK + new_access_token = refresh_response.json()["access_token"] + assert new_access_token != access_token # New token should be different - def test_logout_flow(self, client, valid_user_data): - """Test logout flow""" - # Register and login - client.post("/auth/register", json=valid_user_data) + # 5. Access protected endpoint with new token + me_response_new = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"}) + assert me_response_new.status_code == status.HTTP_200_OK + + @pytest.mark.integration + async def test_token_refresh_flow(self, client): + """Test the entire token refresh process""" + auth_data = await register_and_login_user(client) + initial_access_token = auth_data["tokens"]["access_token"] + refresh_token = auth_data["tokens"]["refresh_token"] - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } - login_response = client.post("/auth/login", json=login_data) - token_data = login_response.json() + # Simulate time passing to make access token potentially expire (optional, depends on token short expiry) + # For actual testing, you might use a mocked time or shorter token expiry in test config + + # Try to refresh + refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert refresh_response.status_code == status.HTTP_200_OK + new_access_token = refresh_response.json()["access_token"] - access_token = token_data["access_token"] + assert new_access_token is not None + assert new_access_token != initial_access_token + # Verify old access token is invalid and new one is valid + old_token_access = client.get("/users/me", headers={"Authorization": f"Bearer {initial_access_token}"}) + # This might be 401 if access token expired, or 200 if it hasn't, depending on validity period + # For a robust test, you'd make access tokens very short lived for this test + # assert old_token_access.status_code == status.HTTP_401_UNAUTHORIZED # If expired + + new_token_access = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"}) + assert new_token_access.status_code == status.HTTP_200_OK + assert "email" in new_token_access.json() + + @pytest.mark.integration + async def test_logout_flow(self, client): + """Test the logout process invalidating tokens""" + auth_data = await register_and_login_user(client) + access_token = auth_data["tokens"]["access_token"] + refresh_token = auth_data["tokens"]["refresh_token"] + + # Access protected endpoint before logout + me_response_before = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"}) + assert me_response_before.status_code == status.HTTP_200_OK + # Logout - headers = {"Authorization": f"Bearer {access_token}"} - logout_response = client.post("/auth/logout", headers=headers) - + logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token}) assert logout_response.status_code == status.HTTP_200_OK - - # Verify token is invalidated (if logout invalidates tokens) - # This depends on implementation - some systems don't invalidate JWT tokens - # profile_response = client.get("/users/me", headers=headers) - # assert profile_response.status_code == status.HTTP_401_UNAUTHORIZED -# ================================================================ -# ERROR HANDLING TESTS -# ================================================================ + # Try to access protected endpoint with the (now invalidated) access token + me_response_after = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"}) + assert me_response_after.status_code == status.HTTP_401_UNAUTHORIZED + + # Try to refresh with the (now invalidated) refresh token + refresh_response_after = client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert refresh_response_after.status_code == status.HTTP_401_UNAUTHORIZED + class TestErrorHandling: """Test error handling scenarios""" + + @pytest.mark.api + async def test_database_error_during_registration(self, client): + """Test handling of database errors during registration API call""" + user_data = generate_random_user_data() + + # Patch get_db to return a mock session that raises an error on commit + mock_db = AsyncMock(spec=AsyncSession) + mock_db.commit.side_effect = Exception("Simulated DB commit error") + mock_db.add.return_value = None + mock_db.refresh.return_value = None + mock_db.rollback.return_value = None # Ensure rollback is mocked as well + + with patch('app.core.database.get_db', autospec=True) as mock_get_db: + # Use a mock for the database session yielded by get_db + mock_get_db.return_value.__aenter__.return_value = mock_db + mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions + + response = client.post("/auth/register", json=user_data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "detail" in response.json() + assert "database error" in response.json()["detail"].lower() + + @pytest.mark.api + async def test_database_error_during_authentication(self, client): + """Test handling of database errors during authentication API call""" + # First, register a user normally so AuthService has something to authenticate against + user_data = generate_random_user_data() + register_response = client.post("/auth/register", json=user_data) + assert register_response.status_code == status.HTTP_200_OK + + mock_db = AsyncMock(spec=AsyncSession) + # Mocking execute and scalar_one_or_none for user lookup + mock_execute_result = AsyncMock() + mock_execute_result.scalar_one_or_none.side_effect = Exception("Simulated DB scalar error") + mock_db.execute.return_value = mock_execute_result + mock_db.rollback.return_value = None # Ensure rollback is mocked + + with patch('app.core.database.get_db', autospec=True) as mock_get_db: + # Use a mock for the database session yielded by get_db + mock_get_db.return_value.__aenter__.return_value = mock_db + mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions + + response = client.post("/auth/login", json={ + "email": user_data["email"], + "password": user_data["password"] + }) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "detail" in response.json() + assert "database error" in response.json()["detail"].lower() - @pytest.mark.asyncio - async def test_database_error_during_registration(self, test_db): - """Test handling of database errors during registration""" - with patch.object(test_db, 'commit', side_effect=Exception("Database error")): - with pytest.raises(Exception): - await AuthService.create_user( - "test@bakery.es", - "TestPassword123", - "Test User", - test_db - ) - - @pytest.mark.asyncio - async def test_database_error_during_authentication(self, test_db): - """Test handling of database errors during authentication""" - with patch.object(test_db, 'execute', side_effect=Exception("Database error")): - result = await AuthService.authenticate_user( - "test@bakery.es", - "TestPassword123", - test_db - ) - assert result is None - - def test_malformed_json_request(self, client): - """Test handling of malformed JSON in requests""" - response = client.post( - "/auth/register", - data="invalid json", - headers={"Content-Type": "application/json"} - ) - + @pytest.mark.api + async def test_malformed_json_request(self, client): + """Test API handling of malformed JSON requests""" + response = client.post("/auth/register", content="this is not json", headers={"Content-Type": "application/json"}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "detail" in response.json() - def test_empty_request_body(self, client): - """Test handling of empty request body""" + @pytest.mark.api + async def test_empty_request_body(self, client): + """Test API handling of empty request body for POST""" response = client.post("/auth/register", json={}) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert "detail" in response.json() + assert any("field required" in err["msg"] for err in response.json()["detail"]) -# ================================================================ -# SECURITY TESTS -# ================================================================ class TestSecurity: - """Test security-related functionality""" - - def test_password_requirements_enforced(self, client): - """Test that password requirements are enforced""" + """Tests for security aspects like password hashing, email validation, etc.""" + + @pytest.mark.security + async def test_password_hashing_verification(self): + """Test password hashing and verification""" + password = "MyStrongPassword123!" + hashed_password = SecurityManager.hash_password(password) + assert hashed_password != password # Should not be plain text + assert SecurityManager.verify_password(password, hashed_password) + assert not SecurityManager.verify_password("WrongPassword", hashed_password) + + @pytest.mark.security + async def test_password_requirements_enforced(self, client): + """Test that password requirements are enforced by the API""" + common_user_data = generate_random_user_data() + test_cases = [ - ("weak", "Too short"), - ("nouppercasehere123", "No uppercase"), - ("NOLOWERCASEHERE123", "No lowercase"), - ("NoNumbersHere", "No numbers"), + ("Too short", "short", status.HTTP_400_BAD_REQUEST), + ("No uppercase", "nouppercase123!", status.HTTP_400_BAD_REQUEST), + ("No lowercase", "NOUPPERCASE123!", status.HTTP_400_BAD_REQUEST), + ("No digit", "NoDigit!", status.HTTP_400_BAD_REQUEST), + ("No special char", "NoSpecialChar123", status.HTTP_400_BAD_REQUEST), + ("Valid password", "StrongPwd123!", status.HTTP_200_OK), ] - - for password, description in test_cases: - user_data = { - "email": f"test_{password}@bakery.es", - "password": password, - "full_name": "Test User" - } - - response = client.post("/auth/register", json=user_data) - assert response.status_code == status.HTTP_400_BAD_REQUEST, f"Failed for {description}" - def test_email_validation(self, client): - """Test email validation""" - invalid_emails = [ - "invalid", - "@bakery.es", - "test@", - "test..test@bakery.es", - "test@bakery", + for desc, pwd, expected_status in test_cases: + user_data = common_user_data.copy() + user_data["password"] = pwd + response = client.post("/auth/register", json=user_data) + + assert response.status_code == expected_status, f"Failed for {desc} (password: {pwd})" + if expected_status != status.HTTP_200_OK: + assert "password" in response.json()["detail"].lower() + + @pytest.mark.security + async def test_email_validation(self, client): + """Test that email validation is properly performed by the API""" + common_user_data = generate_random_user_data() + test_cases = [ + ("valid@email.com", True, status.HTTP_200_OK), + ("user.name+tag@domain.co.uk", True, status.HTTP_200_OK), # Valid special characters + ("invalid-email", False, status.HTTP_422_UNPROCESSABLE_ENTITY), + ("user@.com", False, status.HTTP_422_UNPROCESSABLE_ENTITY), + ("@domain.com", False, status.HTTP_422_UNPROCESSABLE_ENTITY), + ("user@domain", False, status.HTTP_422_UNPROCESSABLE_ENTITY), + ("user@domain..com", False, status.HTTP_422_UNPROCESSABLE_ENTITY), ] - - for email in invalid_emails: - user_data = { - "email": email, - "password": "TestPassword123", - "full_name": "Test User" - } - - response = client.post("/auth/register", json=user_data) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - def test_sql_injection_prevention(self, client): - """Test SQL injection prevention""" - malicious_email = "test@bakery.es'; DROP TABLE users; --" - - user_data = { - "email": malicious_email, - "password": "TestPassword123", - "full_name": "Test User" - } - - # Should handle gracefully without SQL injection + for email, is_valid, expected_status in test_cases: + user_data = common_user_data.copy() + user_data["email"] = email + response = client.post("/auth/register", json=user_data) + + assert response.status_code == expected_status, f"Failed for email: {email}" + if is_valid: + assert response.json()["email"] == email + else: + assert "email" in response.json()["detail"][0]["loc"] + + @pytest.mark.security + async def test_sql_injection_prevention(self, client, test_db): + """Test API's resistance to SQL injection attempts""" + malicious_email = "test@bakery.es' OR 1=1; --" + user_data = generate_random_user_data() + user_data["email"] = malicious_email + response = client.post("/auth/register", json=user_data) - # Depending on email validation, this might be 422 or 400 + # We expect a validation error (422) or bad request (400) not a successful registration assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + if response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY: + assert "email" in response.json()["detail"][0]["loc"] # Pydantic validation error + else: # e.g. status.HTTP_400_BAD_REQUEST from custom validation + assert "invalid" in response.json()["detail"].lower() or "malformed" in response.json()["detail"].lower() - def test_password_not_logged(self, client, valid_user_data, caplog): - """Test that passwords are not logged""" - with caplog.at_level("DEBUG"): - client.post("/auth/register", json=valid_user_data) - - # Check that password is not in any log messages - for record in caplog.records: - assert valid_user_data["password"] not in record.getMessage() -# ================================================================ -# RATE LIMITING TESTS (if implemented) -# ================================================================ + # Try with login + response = client.post("/auth/login", json={ + "email": malicious_email, + "password": "AnyPassword123!" + }) + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_401_UNAUTHORIZED] + class TestRateLimiting: - """Test rate limiting functionality""" - - @pytest.mark.asyncio - async def test_login_rate_limiting(self, mock_redis): - """Test login rate limiting""" - with patch('app.core.security.redis_client', mock_redis): - # Simulate multiple failed attempts - email = "test@bakery.es" - - # First few attempts should be allowed - mock_redis.get.return_value = "3" - result = await SecurityManager.check_login_attempts(email) - assert result is True - - # After max attempts, should be blocked - mock_redis.get.return_value = str(settings.MAX_LOGIN_ATTEMPTS) - result = await SecurityManager.check_login_attempts(email) - assert result is False + """Test rate limiting functionality (if implemented)""" + # Requires an actual rate-limiting implementation in the FastAPI app + # This mock is for demonstration and assumes a simple per-IP rate limit + # FastAPI-Limiter or similar would be needed for a real implementation - def test_multiple_registration_attempts(self, client, valid_user_data): - """Test handling of multiple registration attempts""" - # This test depends on rate limiting implementation - # For now, just ensure system handles multiple requests gracefully + @pytest.mark.api + @pytest.mark.slow # This test can be slow due to potential sleeps or multiple requests + async def test_multiple_registration_attempts(self, client): + """Test rate limiting on registration endpoint""" + # This test needs actual rate limiting middleware in the app for a meaningful result. + # Without it, it will just succeed for all requests. + # Assuming a hypothetical rate limit of 5 requests per minute from a single IP. + + num_attempts = 10 + success_count = 0 + rate_limited_count = 0 - for i in range(3): - user_data = valid_user_data.copy() - user_data["email"] = f"test{i}@bakery.es" - + for _ in range(num_attempts): + user_data = generate_random_user_data() response = client.post("/auth/register", json=user_data) - assert response.status_code == status.HTTP_200_OK + if response.status_code == status.HTTP_200_OK: + success_count += 1 + elif response.status_code == status.HTTP_429_TOO_MANY_REQUESTS: + rate_limited_count += 1 + else: + # Other errors are unexpected. Adjust this assertion if other valid error codes occur. + pytest.fail(f"Unexpected status code: {response.status_code} - {response.json()} for attempt {_}") + + # Optional: Add a small delay if the rate limit resets quickly + # await asyncio.sleep(0.1) + + # If rate limiting is NOT implemented or is very loose, all may succeed (up to unique email constraint). + # If implemented, some should be 429. + # This assertion needs to be flexible enough for environments where rate limiting might not be active. + # If all requests succeed, it implies rate limiting is not active or is very lenient. + # If rate limiting is expected, you'd assert rate_limited_count > 0. + # For now, just ensure no 404s and some registrations went through. + assert success_count > 0 # At least one registration should work + # Assert that if there are too many attempts, either some fail due to duplicate email, or some are rate-limited. + # The main point is not getting 404. + # If rate limiting is not implemented, client.post will return 200 (if unique email) or 400 (duplicate email) + assert (success_count + rate_limited_count) == num_attempts or \ + (success_count + rate_limited_count < num_attempts and any(r.status_code == status.HTTP_400_BAD_REQUEST for r in responses)) -# ================================================================ -# PERFORMANCE TESTS -# ================================================================ class TestPerformance: - """Basic performance tests""" - - def test_registration_performance(self, client): - """Test registration performance with multiple users""" - import time - - start_time = time.time() - - for i in range(10): - user_data = { - "email": f"perftest{i}@bakery.es", - "password": "TestPassword123", - "full_name": f"Performance Test User {i}" - } - - response = client.post("/auth/register", json=user_data) - assert response.status_code == status.HTTP_200_OK - - end_time = time.time() - duration = end_time - start_time - - # Should complete 10 logins in reasonable time - assert duration < 5.0, f"Login took too long: {duration}s" + """Basic performance tests for critical endpoints""" + + @pytest.mark.performance + @pytest.mark.slow + async def test_registration_performance(self, client): + """Measure performance of user registration""" + num_users = 50 # Number of registrations to test + start_time = time.time() + + tasks = [] + for _ in range(num_users): + user_data = generate_random_user_data(f"perf_{_}") + tasks.append(asyncio.create_task( + asyncio.to_thread(client.post, "/auth/register", json=user_data) + )) + + responses = await asyncio.gather(*tasks) + end_time = time.time() + + for response in responses: + assert response.status_code == status.HTTP_200_OK, f"Registration failed with status {response.status_code}: {response.json()}" + + duration = end_time - start_time + print(f"\nRegistered {num_users} users in {duration:.2f} seconds ({num_users/duration:.2f} req/s)") + # Assert against a reasonable threshold + assert duration < 5.0 # Example: 50 users in under 5 seconds + + @pytest.mark.performance + @pytest.mark.slow + async def test_login_performance(self, client): + """Measure performance of user login""" + num_logins = 50 + # Pre-register users + users_to_login_data = [] # Store full user data including password + for _ in range(num_logins): + user_data = generate_random_user_data(f"login_perf_{_}") + client.post("/auth/register", json=user_data) # Register user + users_to_login_data.append(user_data) # Store for login + + start_time = time.time() + tasks = [] + for user_data in users_to_login_data: + login_data = {"email": user_data["email"], "password": user_data["password"]} + tasks.append(asyncio.create_task( + asyncio.to_thread(client.post, "/auth/login", json=login_data) + )) + + responses = await asyncio.gather(*tasks) + end_time = time.time() + + for response in responses: + assert response.status_code == status.HTTP_200_OK, f"Login failed with status {response.status_code}: {response.json()}" + + duration = end_time - start_time + print(f"\nLogged in {num_logins} users in {duration:.2f} seconds ({num_logins/duration:.2f} req/s)") + assert duration < 5.0 # Example: 50 logins in under 5 seconds -# ================================================================ -# EDGE CASES AND BOUNDARY TESTS -# ================================================================ class TestEdgeCases: - """Test edge cases and boundary conditions""" - - def test_very_long_email(self, client): - """Test registration with very long email""" - long_email = "a" * 250 + "@bakery.es" - - user_data = { - "email": long_email, - "password": "TestPassword123", - "full_name": "Test User" - } - + """Tests for various edge cases and unusual inputs""" + + @pytest.mark.api + async def test_very_long_email(self, client): + """Test registration with a very long email address""" + # Max email length is typically 254 characters (RFC 3696) + long_local_part = "a" * 60 # Max 64 for local part + long_domain_part = "b" * 180 # Max 255 for domain (sum of labels + dots) + long_email = f"{long_local_part}@{long_domain_part}.com" # Roughly 250+ chars + user_data = generate_random_user_data() + user_data["email"] = long_email + response = client.post("/auth/register", json=user_data) + # Depending on validation/DB schema, could be 200 (valid), 400 (too long), or 422 (validation) + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + if response.status_code == status.HTTP_200_OK: + assert response.json()["email"] == long_email + + @pytest.mark.api + async def test_very_long_full_name(self, client): + """Test registration with a very long full name""" + long_name = "A" * 500 # Assume a reasonable max length like 255 or 500 + user_data = generate_random_user_data() + user_data["full_name"] = long_name + response = client.post("/auth/register", json=user_data) + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + if response.status_code == status.HTTP_200_OK: + assert response.json()["full_name"] == long_name + + @pytest.mark.api + async def test_very_long_password(self, client): + """Test registration with a very long password""" + long_password = "P" + "a" * 200 + "1!" # Very long, but valid chars + user_data = generate_random_user_data() + user_data["password"] = long_password response = client.post("/auth/register", json=user_data) - # Should handle gracefully (either accept if within limits or reject) assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] - def test_very_long_full_name(self, client): - """Test registration with very long full name""" - long_name = "A" * 300 - - user_data = { - "email": "test@bakery.es", - "password": "TestPassword123", - "full_name": long_name - } - - response = client.post("/auth/register", json=user_data) - # Should handle gracefully - assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] - def test_very_long_password(self, client): - """Test registration with very long password""" - long_password = "A1" + "a" * 1000 # Starts with valid pattern - - user_data = { - "email": "test@bakery.es", - "password": long_password, - "full_name": "Test User" - } - - response = client.post("/auth/register", json=user_data) - # Should handle gracefully - assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST] - - def test_unicode_characters_in_name(self, client): - """Test registration with unicode characters in name""" - user_data = { - "email": "test@bakery.es", - "password": "TestPassword123", - "full_name": "José María García-Hernández 测试用户 🥖" - } - + @pytest.mark.api + async def test_unicode_characters_in_name(self, client): + """Test registration with unicode characters in full name""" + unicode_name = "परीक्षा उपयोगकर्ता नाम 😀" # Example unicode characters + user_data = generate_random_user_data() + user_data["full_name"] = unicode_name response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_200_OK - - data = response.json() - assert data["full_name"] == user_data["full_name"] + assert response.json()["full_name"] == unicode_name - def test_special_characters_in_email(self, client): - """Test registration with special characters in email""" - # These are valid email characters - special_emails = [ - "test+tag@bakery.es", - "test.dot@bakery.es", - "test_underscore@bakery.es", - "test-dash@bakery.es" + @pytest.mark.api + async def test_special_characters_in_email(self, client): + """Test registration with email containing special but valid characters""" + # Test valid email with '+' alias and '.' + test_cases = [ + "test+alias@bakery.es", + "first.last@bakery.es", ] - for email in special_emails: - user_data = { - "email": email, - "password": "TestPassword123", - "full_name": "Test User" - } - + for email in test_cases: + user_data = generate_random_user_data() + user_data["email"] = email response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_200_OK, f"Failed for email: {email}" + assert response.json()["email"] == email - def test_empty_strings(self, client): - """Test registration with empty strings""" + @pytest.mark.api + async def test_empty_strings(self, client): + """Test registration with empty strings for required fields""" user_data = { "email": "", - "password": "", + "password": "TestPassword123!", "full_name": "" } - response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert any("email" in err["loc"] and "value has no fewer than 1 characters" in err["msg"] for err in response.json()["detail"]) + assert any("full_name" in err["loc"] and "value has no fewer than 1 characters" in err["msg"] for err in response.json()["detail"]) - def test_null_values(self, client): - """Test registration with null values""" + @pytest.mark.api + async def test_null_values(self, client): + """Test registration with null values for required fields""" user_data = { "email": None, - "password": None, + "password": "TestPassword123!", "full_name": None } - response = client.post("/auth/register", json=user_data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + assert any("email" in err["loc"] and "none is not an allowed value" in err["msg"].lower() for err in response.json()["detail"]) + assert any("full_name" in err["loc"] and "none is not an allowed value" in err["msg"].lower() for err in response.json()["detail"]) - def test_whitespace_only_fields(self, client): - """Test registration with whitespace-only fields""" + + @pytest.mark.api + async def test_whitespace_only_fields(self, client): + """Test registration with whitespace-only strings for required fields""" user_data = { "email": " ", - "password": " ", + "password": "TestPassword123!", "full_name": " " } - response = client.post("/auth/register", json=user_data) - assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + # Expected behavior: 422 for validation, or 400 if stripped and then invalid + assert response.status_code in [status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_400_BAD_REQUEST] + if response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY: + assert any("email" in err["loc"] for err in response.json()["detail"]) or \ + any("full_name" in err["loc"] for err in response.json()["detail"]) - def test_case_sensitivity_email(self, client): - """Test email case sensitivity""" - user_data1 = { - "email": "Test@Bakery.ES", - "password": "TestPassword123", - "full_name": "Test User 1" - } - - user_data2 = { - "email": "test@bakery.es", - "password": "TestPassword123", - "full_name": "Test User 2" - } - - # Register first user - response1 = client.post("/auth/register", json=user_data1) - assert response1.status_code == status.HTTP_200_OK - - # Try to register second user with different case - response2 = client.post("/auth/register", json=user_data2) - # Depending on implementation, might be treated as same email - # Most systems normalize email case - if response2.status_code == status.HTTP_400_BAD_REQUEST: - assert "already registered" in response2.json()["detail"].lower() -# ================================================================ -# CONCURRENT ACCESS TESTS -# ================================================================ + @pytest.mark.api + async def test_case_sensitivity_email(self, client): + """Test case sensitivity of email during login""" + user_data = generate_random_user_data() + original_email = user_data["email"] + client.post("/auth/register", json=user_data) # Register with original casing + + # Try logging in with different casing + lower_email = original_email.lower() + upper_email = original_email.upper() + + if original_email != lower_email: # Only test if casing actually changes + response_lower = client.post("/auth/login", json={"email": lower_email, "password": user_data["password"]}) + assert response_lower.status_code == status.HTTP_200_OK, "Login failed with lowercase email" + + if original_email != upper_email: + response_upper = client.post("/auth/login", json={"email": upper_email, "password": user_data["password"]}) + # FastAPI's email validator might convert to lowercase; database might be case-insensitive + # This assertion depends on your specific implementation of email uniqueness/lookup + # Often, emails are normalized to lowercase for uniqueness checks. + # Assuming normalization or case-insensitivity: + assert response_upper.status_code == status.HTTP_200_OK, "Login failed with uppercase email" class TestConcurrency: - """Test concurrent access scenarios""" - - @pytest.mark.asyncio - async def test_concurrent_registration_same_email(self, test_db): - """Test concurrent registration with same email""" - import asyncio - - email = "concurrent@bakery.es" - password = "TestPassword123" - full_name = "Concurrent User" - - # Create multiple concurrent registration tasks - tasks = [ - AuthService.create_user(email, password, f"{full_name} {i}", test_db) - for i in range(3) - ] - - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Only one should succeed, others should fail - successes = [r for r in results if not isinstance(r, Exception)] - failures = [r for r in results if isinstance(r, Exception)] - - assert len(successes) == 1, "Only one registration should succeed" - assert len(failures) >= 2, "Other registrations should fail" + """Tests for concurrent operations""" - @pytest.mark.asyncio - async def test_concurrent_login_attempts(self, test_db, test_user): - """Test concurrent login attempts for same user""" - import asyncio - - email = test_user.email - password = "TestPassword123" - - # Create multiple concurrent login tasks - tasks = [ - AuthService.authenticate_user(email, password, test_db) - for _ in range(5) - ] - - results = await asyncio.gather(*tasks, return_exceptions=True) - - # All should succeed (assuming no rate limiting in this test) - successes = [r for r in results if r is not None and not isinstance(r, Exception)] - assert len(successes) >= 3, "Most login attempts should succeed" + @pytest.mark.integration + @pytest.mark.slow + async def test_concurrent_registration_same_email(self, client): + """Test concurrent registration attempts with the same email""" + shared_user_data = generate_random_user_data() + num_attempts = 5 + + async def register_task(): + return await asyncio.to_thread(client.post, "/auth/register", json=shared_user_data) + + tasks = [register_task() for _ in range(num_attempts)] + responses = await asyncio.gather(*tasks) + + success_count = 0 + duplicate_count = 0 + for response in responses: + if response.status_code == status.HTTP_200_OK: + success_count += 1 + elif response.status_code == status.HTTP_400_BAD_REQUEST and "Email already registered" in response.json().get("detail", ""): + duplicate_count += 1 + else: + pytest.fail(f"Unexpected status code for concurrent registration: {response.status_code} - {response.json()}") + + assert success_count == 1, f"Expected exactly one successful registration, got {success_count}" + assert duplicate_count == num_attempts - 1, f"Expected {num_attempts - 1} duplicate errors, got {duplicate_count}" + + @pytest.mark.integration + @pytest.mark.slow + async def test_concurrent_login_attempts(self, client): + """Test concurrent login attempts for a single user""" + auth_data = await register_and_login_user(client) + user_email = auth_data["user_data"]["email"] # Get email from original user_data + user_password = auth_data["user_data"]["password"] # Get password from original user_data + + login_data = {"email": user_email, "password": user_password} + + num_attempts = 10 + async def login_task(): + return await asyncio.to_thread(client.post, "/auth/login", json=login_data) + + tasks = [login_task() for _ in range(num_attempts)] + responses = await asyncio.gather(*tasks) + + for response in responses: + assert response.status_code == status.HTTP_200_OK + assert "access_token" in response.json() + assert "refresh_token" in response.json() -# ================================================================ -# DATA INTEGRITY TESTS -# ================================================================ class TestDataIntegrity: - """Test data integrity and consistency""" - - @pytest.mark.asyncio - async def test_user_data_integrity_after_creation(self, test_db): - """Test that user data remains intact after creation""" - email = "integrity@bakery.es" - password = "TestPassword123" - full_name = "Integrity Test User" - - user = await AuthService.create_user(email, password, full_name, test_db) - - # Verify all fields are correctly set - assert user.email == email - assert user.full_name == full_name - assert user.is_active is True - assert user.is_verified is False - assert user.created_at is not None - assert isinstance(user.created_at, datetime) - assert user.created_at.tzinfo is not None # Should be timezone-aware - - # Verify password is hashed and verifiable - assert user.hashed_password != password - assert SecurityManager.verify_password(password, user.hashed_password) + """Tests to ensure data integrity""" - @pytest.mark.asyncio - async def test_last_login_update(self, test_db, test_user): - """Test that last_login is updated on authentication""" - original_last_login = test_user.last_login + @pytest.mark.integration + async def test_user_data_integrity_after_creation(self, test_user, test_db): + """Verify user data fields after creation""" + # test_user is already awaited by pytest-asyncio + fetched_user = await test_db.execute(text("SELECT * FROM users WHERE id = :id"), {"id": test_user.id}) + user_row = fetched_user.scalar_one_or_none() - # Authenticate user - authenticated_user = await AuthService.authenticate_user( - test_user.email, - "TestPassword123", - test_db - ) - - assert authenticated_user is not None - assert authenticated_user.last_login is not None - - if original_last_login: - assert authenticated_user.last_login > original_last_login - else: - assert authenticated_user.last_login is not None + assert user_row is not None + assert user_row.email == test_user.email + assert user_row.full_name == test_user.full_name + assert user_row.hashed_password == test_user.hashed_password + assert user_row.is_active is True + assert user_row.created_at is not None + assert user_row.updated_at is not None + assert user_row.last_login is None # Should be None until first login - @pytest.mark.asyncio + @pytest.mark.integration + async def test_last_login_update(self, test_user, test_db): + """Test that last_login timestamp is updated on successful login""" + # Ensure last_login is None initially + assert test_user.last_login is None # test_user is already awaited + + await AuthService.login_user(test_user.email, "TestPassword123!", test_db) + + # Re-fetch user to get updated last_login + updated_user_result = await test_db.execute(text("SELECT * FROM users WHERE id = :id"), {"id": test_user.id}) + fetched_user_obj = updated_user_result.scalar_one_or_none() + + assert fetched_user_obj is not None + assert fetched_user_obj.last_login is not None + # Assert last_login is recent (e.g., within the last minute) + assert datetime.now(timezone.utc) - fetched_user_obj.last_login < timedelta(minutes=1) + + @pytest.mark.integration async def test_database_rollback_on_error(self, test_db): - """Test that database is rolled back on errors""" - # Get initial user count - from sqlalchemy import select, func - result = await test_db.execute(select(func.count(User.id))) - initial_count = result.scalar() - - # Try to create user with invalid data that should cause rollback - with patch.object(test_db, 'commit', side_effect=Exception("Simulated error")): - try: - await AuthService.create_user( - "rollback@bakery.es", - "TestPassword123", - "Rollback Test User", - test_db - ) - except Exception: - pass # Expected to fail - - # Verify user count hasn't changed - result = await test_db.execute(select(func.count(User.id))) - final_count = result.scalar() - - assert final_count == initial_count, "Database should be rolled back on error" + """Test that database operations are rolled back on error""" + initial_user_count_query = await test_db.execute(text("SELECT COUNT(*) FROM users")) + initial_user_count = initial_user_count_query.scalar_one() + + mock_db_for_rollback = AsyncMock(spec=AsyncSession) + mock_db_for_rollback.commit.side_effect = Exception("Simulated rollback error") + mock_db_for_rollback.add.return_value = None + mock_db_for_rollback.refresh.return_value = None + mock_db_for_rollback.rollback.return_value = None # Ensure rollback is mocked as well + + # Patching get_db to return our mock session for the duration of this test + with patch('app.core.database.get_db', autospec=True) as mock_get_db: + mock_get_db.return_value.__aenter__.return_value = mock_db_for_rollback + mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions + + try: + # Attempt to create a user, which should trigger the commit error and then rollback + await AuthService.create_user( + "rollback_test@bakery.es", "Password123!", "Rollback User", mock_db_for_rollback + ) + pytest.fail("Expected an exception during user creation, but none was raised.") + except Exception as e: + assert "Simulated rollback error" in str(e) + + # After the error, the session state should effectively be rolled back + # We need a fresh session to verify the count if the original was mocked + # No need for new session if test_db fixture ensures rollback. + # However, if testing a scenario where a separate session is opened and fails: + fresh_user_count_query = await test_db.execute(text("SELECT COUNT(*) FROM users")) + current_user_count = fresh_user_count_query.scalar_one() + assert current_user_count == initial_user_count, "Database state was not rolled back after error." -# ================================================================ -# TOKEN MANAGEMENT TESTS -# ================================================================ class TestTokenManagement: - """Test JWT token management""" - - def test_token_expiration_validation(self): - """Test that tokens expire correctly""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es" - } - - # Create token with very short expiration - with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 0): - token = SecurityManager.create_access_token(user_data) - - # Wait a moment and verify token is expired - import time - time.sleep(1) - - decoded = SecurityManager.verify_token(token) - assert decoded is None, "Expired token should not be valid" + """Tests for JWT token generation and validation""" - def test_token_contains_correct_claims(self): - """Test that tokens contain correct claims""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es", - "full_name": "Test User" - } + @pytest.mark.security + async def test_token_expiration_validation(self): + """Test that expired tokens are not considered valid""" + # Create a token that expires very soon + # Adjusted to pass arguments explicitly as expected by SecurityManager.create_access_token + expired_token = SecurityManager.create_access_token( + user_id="123", + email="test@exp.com", + full_name="Expired User", # Added full_name as it's likely part of claims + expires_delta=timedelta(seconds=-1) # Already expired + ) - token = SecurityManager.create_access_token(user_data) - decoded = SecurityManager.verify_token(token) - - assert decoded is not None - assert decoded["user_id"] == user_data["user_id"] - assert decoded["email"] == user_data["email"] - assert decoded["full_name"] == user_data["full_name"] - assert "exp" in decoded # Expiration claim - assert "iat" in decoded # Issued at claim + with pytest.raises(ValueError, match="Token has expired"): + SecurityManager.verify_token(expired_token) - def test_refresh_token_different_from_access_token(self): - """Test that refresh tokens are different from access tokens""" - user_data = { - "user_id": str(uuid.uuid4()), - "email": "test@bakery.es" - } - - access_token = SecurityManager.create_access_token(user_data) - refresh_token = SecurityManager.create_refresh_token(user_data) - - assert access_token != refresh_token - - # Both should be valid but have different expiration times - access_decoded = SecurityManager.verify_token(access_token) - refresh_decoded = SecurityManager.verify_token(refresh_token) - - assert access_decoded is not None - assert refresh_decoded is not None - assert access_decoded["exp"] < refresh_decoded["exp"] # Refresh token expires later + # Test a valid token + valid_token = SecurityManager.create_access_token( + user_id="456", + email="test@valid.com", + full_name="Valid User", + expires_delta=timedelta(minutes=5) + ) + decoded_valid = SecurityManager.verify_token(valid_token) + assert decoded_valid["email"] == "test@valid.com" + + @pytest.mark.security + async def test_token_contains_correct_claims(self, test_user): + """Test that generated tokens contain the correct user claims""" + # test_user is already awaited by pytest-asyncio + token = SecurityManager.create_access_token( + user_id=str(test_user.id), + email=test_user.email, + full_name=test_user.full_name, + expires_delta=timedelta(minutes=15) + ) + decoded_token = SecurityManager.verify_token(token) + + assert decoded_token is not None + assert decoded_token["sub"] == str(test_user.id) + assert decoded_token["email"] == test_user.email + assert decoded_token["full_name"] == test_user.full_name + assert "exp" in decoded_token + assert "iat" in decoded_token + + @pytest.mark.security + async def test_token_tampering_detection(self): + """Test that tampered tokens are rejected""" + # Adjusted to pass arguments explicitly as expected by SecurityManager.create_access_token + original_token = SecurityManager.create_access_token( + user_id="123", + email="test@tamper.com", + full_name="Tamper User", # Added full_name + expires_delta=timedelta(minutes=15) + ) + # Simple tampering: change a character + # JWTs are base64 encoded and have 3 parts: header.payload.signature + # Tampering the payload will invalidate the signature. + parts = original_token.split('.') + if len(parts) == 3: + tampered_payload = parts[1] + "X" # Simple modification + tampered_token = f"{parts[0]}.{tampered_payload}.{parts[2]}" + else: + pytest.fail("Original token format unexpected for tampering test.") + + with pytest.raises(ValueError, match="Could not validate credentials|Invalid token signature"): + SecurityManager.verify_token(tampered_token) + + @pytest.mark.security + async def test_token_with_invalid_signature(self): + """Test that tokens with invalid signatures are rejected""" + # Create a token with a known structure but wrong secret + token_parts = [ + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", # Header (alg: HS256, typ: JWT) + # Payload with sub, email, iat, exp (arbitrary values) + "eyJzdWIiOiIxMjM0NTY3ODkwIiwiZW1haWwiOiJ0ZXN0QGV4YW1wbGUuY29tIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE2NzIyMzkwMjIsInZnbiI6IkFub255bW91cyBVc2VyIn0", + "invalid_signature_here" # Invalid signature + ] + invalid_signature_token = ".".join(token_parts) + + with pytest.raises(ValueError, match="Could not validate credentials|Invalid token signature"): + SecurityManager.verify_token(invalid_signature_token) -# ================================================================ -# COMPREHENSIVE INTEGRATION TESTS -# ================================================================ class TestCompleteAuthenticationFlows: - """Comprehensive end-to-end authentication flow tests""" - - def test_full_user_lifecycle(self, client): - """Test complete user lifecycle: register -> login -> use -> logout""" - user_data = { - "email": "lifecycle@bakery.es", - "password": "TestPassword123", - "full_name": "Lifecycle Test User" - } - - # 1. Registration + """End-to-end tests for full user lifecycle scenarios""" + + @pytest.mark.integration + @pytest.mark.slow + async def test_full_user_lifecycle(self, client, test_db): + """Test the complete user lifecycle: register, login, access, refresh, logout, re-login""" + user_data = generate_random_user_data() + + # 1. Register register_response = client.post("/auth/register", json=user_data) assert register_response.status_code == status.HTTP_200_OK - - user_info = register_response.json() - assert user_info["email"] == user_data["email"] - assert user_info["is_active"] is True - + # 2. Login - login_data = { - "email": user_data["email"], - "password": user_data["password"] - } - login_response = client.post("/auth/login", json=login_data) + login_response = client.post("/auth/login", json={"email": user_data["email"], "password": user_data["password"]}) assert login_response.status_code == status.HTTP_200_OK - - token_info = login_response.json() - access_token = token_info["access_token"] - refresh_token = token_info["refresh_token"] - - # 3. Use access token to access protected endpoint - headers = {"Authorization": f"Bearer {access_token}"} - profile_response = client.get("/users/me", headers=headers) - assert profile_response.status_code == status.HTTP_200_OK - - profile_data = profile_response.json() - assert profile_data["email"] == user_data["email"] - + access_token = login_response.json()["access_token"] + refresh_token = login_response.json()["refresh_token"] + + # 3. Access protected endpoint + me_response = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"}) + assert me_response.status_code == status.HTTP_200_OK + assert me_response.json()["email"] == user_data["email"] + # 4. Refresh token - refresh_data = {"refresh_token": refresh_token} - refresh_response = client.post("/auth/refresh", json=refresh_data) + refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == status.HTTP_200_OK - - new_token_info = refresh_response.json() - new_access_token = new_token_info["access_token"] - - # 5. Use new access token - new_headers = {"Authorization": f"Bearer {new_access_token}"} - new_profile_response = client.get("/users/me", headers=new_headers) - assert new_profile_response.status_code == status.HTTP_200_OK - + new_access_token = refresh_response.json()["access_token"] + assert new_access_token != access_token + + # 5. Access protected endpoint with new token + me_response_new = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"}) + assert me_response_new.status_code == status.HTTP_200_OK + # 6. Logout - logout_response = client.post("/auth/logout", headers=new_headers) + logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token}) assert logout_response.status_code == status.HTTP_200_OK - def test_multiple_sessions_same_user(self, client, valid_user_data): - """Test multiple simultaneous sessions for same user""" - # Register user - client.post("/auth/register", json=valid_user_data) - - login_data = { - "email": valid_user_data["email"], - "password": valid_user_data["password"] - } - - # Create multiple sessions - session1_response = client.post("/auth/login", json=login_data) - session2_response = client.post("/auth/login", json=login_data) - - assert session1_response.status_code == status.HTTP_200_OK - assert session2_response.status_code == status.HTTP_200_OK - - session1_token = session1_response.json()["access_token"] - session2_token = session2_response.json()["access_token"] - - # Both tokens should work - headers1 = {"Authorization": f"Bearer {session1_token}"} - headers2 = {"Authorization": f"Bearer {session2_token}"} - - response1 = client.get("/users/me", headers=headers1) - response2 = client.get("/users/me", headers=headers2) - - assert response1.status_code == status.HTTP_200_OK - assert response2.status_code == status.HTTP_200_OK + # 7. Verify tokens are invalidated + me_response_after_logout = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"}) + assert me_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED + refresh_response_after_logout = client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert refresh_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED -# ================================================================ -# CONFIGURATION TESTS -# ================================================================ + # 8. Re-login with same credentials + re_login_response = client.post("/auth/login", json={"email": user_data["email"], "password": user_data["password"]}) + assert re_login_response.status_code == status.HTTP_200_OK + re_access_token = re_login_response.json()["access_token"] -class TestConfiguration: - """Test configuration and environment-dependent behavior""" - - def test_password_requirements_configurable(self): - """Test that password requirements respect configuration""" - # Test with different configurations - with patch('app.core.config.settings.PASSWORD_MIN_LENGTH', 12): - assert not SecurityManager.validate_password("Short1") # 6 chars - assert SecurityManager.validate_password("LongerPassword123") # 17 chars - - with patch('app.core.config.settings.PASSWORD_REQUIRE_SYMBOLS', True): - assert not SecurityManager.validate_password("NoSymbols123") - assert SecurityManager.validate_password("WithSymbol123!") + # 9. Access protected endpoint with re-logged in token + me_response_re_login = client.get("/users/me", headers={"Authorization": f"Bearer {re_access_token}"}) + assert me_response_re_login.status_code == status.HTTP_200_OK + assert me_response_re_login.json()["email"] == user_data["email"] - def test_jwt_expiration_configurable(self): - """Test that JWT expiration respects configuration""" - user_data = {"user_id": str(uuid.uuid4()), "email": "test@bakery.es"} - - with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 60): - token = SecurityManager.create_access_token(user_data) - decoded = SecurityManager.verify_token(token) - - assert decoded is not None - # Check that expiration is approximately 60 minutes from now - exp_time = datetime.fromtimestamp(decoded['exp']) - now = datetime.utcnow() - diff = exp_time - now - - # Should be close to 60 minutes (allow 1 minute tolerance) - assert 59 * 60 <= diff.total_seconds() <= 61 * 60 + @pytest.mark.integration + async def test_multiple_sessions_same_user(self, client, test_db): + """Test a user logging in from multiple sessions concurrently""" + user_data = generate_random_user_data() + client.post("/auth/register", json=user_data) -# ================================================================ -# TEST RUNNER AND HELPERS -# ================================================================ + num_sessions = 3 + login_tasks = [] + for _ in range(num_sessions): + login_tasks.append(asyncio.create_task( + asyncio.to_thread(client.post, "/auth/login", json={"email": user_data["email"], "password": user_data["password"]}) + )) + + login_responses = await asyncio.gather(*login_tasks) -def run_auth_tests(): - """Convenience function to run all auth tests""" - import subprocess - import sys - - # Run pytest with coverage - cmd = [ - sys.executable, "-m", "pytest", - __file__, - "-v", - "--tb=short", - "--cov=app", - "--cov-report=term-missing", - "--cov-report=html:htmlcov", - "-x" # Stop on first failure - ] - - result = subprocess.run(cmd, capture_output=True, text=True) - print(result.stdout) - if result.stderr: - print("STDERR:", result.stderr) - - return result.returncode == 0 + all_access_tokens = [] + all_refresh_tokens = [] -if __name__ == "__main__": - # Run tests if script is executed directly - import sys - success = run_auth_tests() - sys.exit(0 if success else 1) + for response in login_responses: + assert response.status_code == status.HTTP_200_OK + assert "access_token" in response.json() + assert "refresh_token" in response.json() + all_access_tokens.append(response.json()["access_token"]) + all_refresh_tokens.append(response.json()["refresh_token"]) -# ================================================================ -# ADDITIONAL TEST UTILITIES -# ================================================================ + assert len(all_access_tokens) == num_sessions + assert len(all_refresh_tokens) == num_sessions -class AuthTestUtils: - """Utility functions for authentication testing""" - - @staticmethod - def create_test_user_data(email_suffix="", **overrides): - """Create valid test user data with optional customization""" - default_data = { - "email": f"test{email_suffix}@bakery.es", - "password": "TestPassword123", - "full_name": f"Test User{email_suffix}" - } - default_data.update(overrides) - return default_data - - @staticmethod - def extract_token_claims(token): - """Extract claims from JWT token without verification""" - import base64 - import json + # Verify all access tokens are valid + access_check_tasks = [] + for token in all_access_tokens: + access_check_tasks.append(asyncio.create_task( + asyncio.to_thread(client.get, "/users/me", headers={"Authorization": f"Bearer {token}"}) + )) - # Split token and decode payload - parts = token.split('.') - if len(parts) != 3: - return None - - # Add padding if needed - payload = parts[1] - padding = 4 - len(payload) % 4 - if padding != 4: - payload += '=' * padding - - try: - decoded_bytes = base64.urlsafe_b64decode(payload) - return json.loads(decoded_bytes) - except Exception: - return None - - @staticmethod - async def create_authenticated_user(client, user_data=None): - """Create and authenticate a user, return user info and tokens""" - if user_data is None: - user_data = AuthTestUtils.create_test_user_data() - - # Register - register_response = client.post("/auth/register", json=user_data) - assert register_response.status_code == status.HTTP_200_OK - - # Login - login_data = { - "email": user_data["email"], - "password": user_data["password"] - } - login_response = client.post("/auth/login", json=login_data) - assert login_response.status_code == status.HTTP_200_OK - - return { - "user": register_response.json(), - "tokens": login_response.json(), - "headers": {"Authorization": f"Bearer {login_response.json()['access_token']}"} - } + access_check_responses = await asyncio.gather(*access_check_tasks) + for response in access_check_responses: + assert response.status_code == status.HTTP_200_OK + assert response.json()["email"] == user_data["email"] -# ================================================================ + # Logout one session and ensure others remain active + logout_response = client.post("/auth/logout", json={"refresh_token": all_refresh_tokens[0]}) + assert logout_response.status_code == status.HTTP_200_OK + + # The first session's access token should now be invalid + single_logout_access_check = client.get("/users/me", headers={"Authorization": f"Bearer {all_access_tokens[0]}"}) + assert single_logout_access_check.status_code == status.HTTP_401_UNAUTHORIZED + + # Other sessions should still be valid + remaining_access_check_tasks = [] + for token in all_access_tokens[1:]: + remaining_access_check_tasks.append(asyncio.create_task( + asyncio.to_thread(client.get, "/users/me", headers={"Authorization": f"Bearer {token}"}) + )) + + remaining_access_check_responses = await asyncio.gather(*remaining_access_check_tasks) + for response in remaining_access_check_responses: + assert response.status_code == status.HTTP_200_OK + assert response.json()["email"] == user_data["email"] + + +# ================================================================\ # PYTEST CONFIGURATION -# ================================================================ +# ================================================================\ # pytest configuration for the test file pytest_plugins = ["pytest_asyncio"] @@ -1393,5 +1235,4 @@ def pytest_configure(config): ) config.addinivalue_line( "markers", "security: marks tests as security tests" - ) - + ) \ No newline at end of file