Files
bakery-ia/services/auth/tests/test_auth_comprehensive.py

1238 lines
58 KiB
Python
Raw Normal View History

2025-07-20 13:48:26 +02:00
import pytest
import asyncio
import uuid
2025-07-20 14:10:10 +02:00
import time # Added for performance tests
2025-07-20 13:48:26 +02:00
from datetime import datetime, timedelta, timezone
2025-07-20 14:10:10 +02:00
from typing import AsyncGenerator # Added to fix NameError
2025-07-20 13:48:26 +02:00
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient
from fastapi import status
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
import redis.asyncio as redis
# Import the authentication service modules
from app.main import app
from app.models.users import User, RefreshToken
from app.schemas.auth import UserRegistration, UserLogin, TokenResponse
from app.services.auth_service import AuthService
from app.core.security import SecurityManager
from app.core.config import settings
from app.core.database import get_db
from shared.database.base import Base
# ================================================================
# TEST CONFIGURATION AND FIXTURES
# ================================================================
# Test database configuration - Use in-memory SQLite for speed
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="function")
async def test_engine():
2025-07-20 14:10:10 +02:00
"""Create a test database engine for each test function"""
2025-07-20 13:48:26 +02:00
engine = create_async_engine(
TEST_DATABASE_URL,
2025-07-20 14:10:10 +02:00
echo=False, # Set to True for SQL debugging
future=True,
pool_pre_ping=True
2025-07-20 13:48:26 +02:00
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
2025-07-20 14:10:10 +02:00
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
2025-07-20 13:48:26 +02:00
await engine.dispose()
@pytest.fixture(scope="function")
2025-07-20 14:10:10 +02:00
async def test_db(test_engine) -> AsyncGenerator[AsyncSession, None]:
"""Create a test database session for each test function"""
2025-07-20 13:48:26 +02:00
async_session = sessionmaker(
2025-07-20 14:10:10 +02:00
test_engine,
class_=AsyncSession,
expire_on_commit=False
2025-07-20 13:48:26 +02:00
)
2025-07-20 14:10:10 +02:00
2025-07-20 13:48:26 +02:00
async with async_session() as session:
yield session
2025-07-20 14:10:10 +02:00
await session.rollback() # Rollback after each test to ensure a clean state
2025-07-20 13:48:26 +02:00
@pytest.fixture(scope="function")
def client(test_db):
2025-07-20 14:10:10 +02:00
"""Create a test client with database dependency override"""
try:
from app.main import app
from app.core.database import get_db
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
def override_get_db():
# test_db is already an AsyncSession yielded by the fixture
yield test_db
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
# Clean up overrides
app.dependency_overrides.clear()
except ImportError as e:
pytest.skip(f"Cannot import app modules: {e}. Ensure app.main and app.core.database are accessible.")
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.fixture(scope="function")
2025-07-20 13:48:26 +02:00
async def test_user(test_db):
"""Create a test user in the database"""
2025-07-20 14:10:10 +02:00
user_data = {
"email": "existing@bakery.es",
"password": "TestPassword123!",
"full_name": "Existing User"
}
2025-07-20 13:48:26 +02:00
user = await AuthService.create_user(
2025-07-20 14:10:10 +02:00
email=user_data["email"],
password=user_data["password"],
full_name=user_data["full_name"],
2025-07-20 13:48:26 +02:00
db=test_db
)
return user
2025-07-20 14:10:10 +02:00
@pytest.fixture(scope="function")
async def test_redis_client():
"""Create a test Redis client"""
mock_redis = AsyncMock(spec=redis.Redis)
yield mock_redis
await mock_redis.close()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# ================================================================\
# HELPER FUNCTIONS
# ================================================================\
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Moved generate_random_user_data to conftest.py as it's a shared helper.
# It's imported implicitly via conftest.py if setup correctly.
# If not, add explicit import: from conftest import generate_random_user_data
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
async def register_and_login_user(client: TestClient):
"""Helper to register and login a user, returning user data and tokens."""
user_data = generate_random_user_data() # Ensure this function is accessible
# Register
register_response = client.post("/auth/register", json=user_data)
assert register_response.status_code == status.HTTP_200_OK
# Login
login_data = {
"email": user_data["email"],
"password": user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
assert login_response.status_code == status.HTTP_200_OK
return {
"user_data": user_data, # Return original user_data for password access
"user_response": register_response.json(), # Return user info from register
"tokens": login_response.json(),
"headers": {"Authorization": f"Bearer {login_response.json()['access_token']}"}
}
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# ================================================================\
# TEST SUITE
# ================================================================\
2025-07-20 13:48:26 +02:00
class TestAuthService:
2025-07-20 14:10:10 +02:00
"""Comprehensive unit tests for AuthService"""
2025-07-20 13:48:26 +02:00
@pytest.mark.asyncio
async def test_create_user_success(self, test_db):
"""Test successful user creation"""
2025-07-20 14:10:10 +02:00
user = await AuthService.create_user(
"test_new@bakery.es", "Password123!", "New User", test_db
)
assert user is not None
assert user.email == "test_new@bakery.es"
assert SecurityManager.verify_password("Password123!", user.hashed_password)
2025-07-20 13:48:26 +02:00
assert user.is_active is True
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_create_user_duplicate_email(self, test_user, test_db):
"""Test creating a user with a duplicate email"""
# test_user is already awaited by pytest-asyncio
with pytest.raises(ValueError, match="Email already registered"):
await AuthService.create_user(
test_user.email, "AnotherPassword!", "Duplicate User", test_db
)
2025-07-20 13:48:26 +02:00
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_authenticate_user_success(self, test_user, test_db):
2025-07-20 13:48:26 +02:00
"""Test successful user authentication"""
2025-07-20 14:10:10 +02:00
# test_user is already awaited by pytest-asyncio
authenticated_user = await AuthService.authenticate_user(
test_user.email, "TestPassword123!", test_db
)
2025-07-20 13:48:26 +02:00
assert authenticated_user is not None
assert authenticated_user.id == test_user.id
2025-07-20 14:10:10 +02:00
assert authenticated_user.email == test_user.email
2025-07-20 13:48:26 +02:00
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_authenticate_user_wrong_password(self, test_user, test_db):
2025-07-20 13:48:26 +02:00
"""Test authentication with wrong password"""
2025-07-20 14:10:10 +02:00
# test_user is already awaited by pytest-asyncio
authenticated_user = await AuthService.authenticate_user(
test_user.email, "WrongPassword", test_db
)
2025-07-20 13:48:26 +02:00
assert authenticated_user is None
@pytest.mark.asyncio
async def test_authenticate_user_nonexistent(self, test_db):
2025-07-20 14:10:10 +02:00
"""Test authentication of a nonexistent user"""
authenticated_user = await AuthService.authenticate_user(
"nonexistent@bakery.es", "AnyPassword", test_db
)
2025-07-20 13:48:26 +02:00
assert authenticated_user is None
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_authenticate_user_inactive(self, test_user, test_db):
"""Test authentication of an inactive user"""
# test_user is already awaited by pytest-asyncio
test_user.is_active = False
await test_db.commit() # Commit the change to the database
2025-07-20 13:48:26 +02:00
authenticated_user = await AuthService.authenticate_user(
2025-07-20 14:10:10 +02:00
test_user.email, "TestPassword123!", test_db
2025-07-20 13:48:26 +02:00
)
assert authenticated_user is None
2025-07-20 14:10:10 +02:00
test_user.is_active = True # Reset for other tests if needed
await test_db.commit() # Commit the reset
2025-07-20 13:48:26 +02:00
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_login_user_success(self, test_user, test_db):
"""Test successful user login, including token generation"""
tokens = await AuthService.login_user( # Assuming AuthService.login_user is a class method
test_user.email, "TestPassword123!", test_db
)
assert tokens.access_token is not None
assert tokens.refresh_token is not None
# Verify access token claims
decoded_access_token = SecurityManager.verify_token(tokens.access_token)
assert decoded_access_token["sub"] == str(test_user.id)
assert decoded_access_token["email"] == test_user.email
assert decoded_access_token["full_name"] == test_user.full_name
# Verify refresh token is stored (mocked or actual db check)
refresh_token_db = await test_db.execute(
text("SELECT * FROM refresh_tokens WHERE user_id = :user_id"),
{"user_id": test_user.id}
)
refresh_token_db_obj = refresh_token_db.scalar_one_or_none()
assert refresh_token_db_obj is not None
assert SecurityManager.verify_password(tokens.refresh_token, refresh_token_db_obj.hashed_token)
2025-07-20 13:48:26 +02:00
@pytest.mark.asyncio
2025-07-20 14:10:10 +02:00
async def test_login_user_invalid_credentials(self, test_db):
2025-07-20 13:48:26 +02:00
"""Test login with invalid credentials"""
2025-07-20 14:10:10 +02:00
with pytest.raises(ValueError, match="Invalid credentials"):
await AuthService.login_user("nonexistent@bakery.es", "WrongPassword", test_db) # Assuming AuthService.login_user is a class method
@pytest.mark.asyncio
async def test_refresh_access_token_success(self, test_user, test_db):
"""Test successful access token refresh"""
initial_tokens = await AuthService.login_user(test_user.email, "TestPassword123!", test_db) # Assuming AuthService.login_user is a class method
new_tokens = await AuthService.refresh_access_token(initial_tokens.refresh_token, test_db) # Assuming AuthService.refresh_access_token is a class method
assert new_tokens.access_token is not None
assert new_tokens.refresh_token == initial_tokens.refresh_token # Refresh token typically remains the same
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
decoded_access_token = SecurityManager.verify_token(new_tokens.access_token)
assert decoded_access_token["sub"] == str(test_user.id)
assert decoded_access_token["email"] == test_user.email
@pytest.mark.asyncio
async def test_refresh_access_token_invalid(self, test_db):
"""Test refresh with an invalid refresh token"""
with pytest.raises(ValueError, match="Invalid refresh token"):
await AuthService.refresh_access_token("invalid_refresh_token", test_db) # Assuming AuthService.refresh_access_token is a class method
@pytest.mark.asyncio
async def test_logout_user_success(self, test_user, test_db):
"""Test successful user logout"""
initial_tokens = await AuthService.login_user(test_user.email, "TestPassword123!", test_db) # Assuming AuthService.login_user is a class method
assert await AuthService.logout_user(initial_tokens.refresh_token, test_db) is True # Assuming AuthService.logout_user is a class method
refresh_token_db = await test_db.execute(
text("SELECT * FROM refresh_tokens WHERE user_id = :user_id"),
{"user_id": test_user.id}
)
assert refresh_token_db.scalar_one_or_none() is None # Token should be deleted
@pytest.mark.asyncio
async def test_logout_user_invalid(self, test_db):
"""Test logout with an invalid refresh token"""
with pytest.raises(ValueError, match="Invalid refresh token"):
await AuthService.logout_user("nonexistent_refresh_token", test_db) # Assuming AuthService.logout_user is a class method
2025-07-20 13:48:26 +02:00
class TestAuthenticationAPI:
2025-07-20 14:10:10 +02:00
"""Integration tests for authentication API endpoints"""
@pytest.mark.api
async def test_register_success(self, client):
2025-07-20 13:48:26 +02:00
"""Test successful user registration via API"""
2025-07-20 14:10:10 +02:00
user_data = generate_random_user_data()
response = client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
assert "message" in response.json()
assert "User registered successfully" in response.json()["message"]
# Optionally, verify user exists in DB
@pytest.mark.api
async def test_register_weak_password(self, client):
"""Test registration with a weak password"""
user_data = generate_random_user_data()
user_data["password"] = "short" # Weak password
response = client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_400_BAD_REQUEST
2025-07-20 14:10:10 +02:00
assert "password" in response.json()["detail"].lower()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_register_invalid_email(self, client):
"""Test registration with an invalid email format"""
user_data = generate_random_user_data()
user_data["email"] = "invalid-email"
response = client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "email" in response.json()["detail"][0]["loc"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_register_missing_fields(self, client):
2025-07-20 13:48:26 +02:00
"""Test registration with missing required fields"""
2025-07-20 14:10:10 +02:00
user_data = {"email": "test@bakery.es", "password": "TestPassword123!"} # Missing full_name
response = client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "full_name" in response.json()["detail"][0]["loc"]
@pytest.mark.api
async def test_register_duplicate_email(self, client):
"""Test registration with an email that is already registered"""
user_data = generate_random_user_data()
client.post("/auth/register", json=user_data) # First registration
response = client.post("/auth/register", json=user_data) # Duplicate registration
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "Email already registered" in response.json()["detail"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_login_success(self, client):
"""Test successful user login via API"""
user_data = generate_random_user_data()
client.post("/auth/register", json=user_data) # Register first
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
login_response = client.post("/auth/login", json={
"email": user_data["email"],
"password": user_data["password"]
})
assert login_response.status_code == status.HTTP_200_OK
assert "access_token" in login_response.json()
assert "refresh_token" in login_response.json()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_login_wrong_password(self, client):
"""Test login with wrong password via API"""
user_data = generate_random_user_data()
client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
login_response = client.post("/auth/login", json={
"email": user_data["email"],
"password": "WrongPassword"
})
assert login_response.status_code == status.HTTP_401_UNAUTHORIZED
assert "Invalid credentials" in login_response.json()["detail"]
@pytest.mark.api
async def test_login_nonexistent_user(self, client):
"""Test login for a nonexistent user via API"""
login_response = client.post("/auth/login", json={
2025-07-20 13:48:26 +02:00
"email": "nonexistent@bakery.es",
2025-07-20 14:10:10 +02:00
"password": "AnyPassword123!"
})
assert login_response.status_code == status.HTTP_401_UNAUTHORIZED
assert "Invalid credentials" in login_response.json()["detail"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_login_missing_fields(self, client):
2025-07-20 13:48:26 +02:00
"""Test login with missing fields"""
2025-07-20 14:10:10 +02:00
response = client.post("/auth/login", json={"email": "test@bakery.es"}) # Missing password
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "password" in response.json()["detail"][0]["loc"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_login_invalid_email_format(self, client):
2025-07-20 13:48:26 +02:00
"""Test login with invalid email format"""
2025-07-20 14:10:10 +02:00
user_data = generate_random_user_data()
client.post("/auth/register", json=user_data)
response = client.post("/auth/login", json={
"email": "invalid-email-format",
"password": user_data["password"]
})
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "email" in response.json()["detail"][0]["loc"]
@pytest.mark.api
async def test_token_refresh(self, client):
"""Test refreshing an access token via API"""
auth_data = await register_and_login_user(client)
refresh_token = auth_data["tokens"]["refresh_token"]
refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response.status_code == status.HTTP_200_OK
assert "access_token" in refresh_response.json()
assert "refresh_token" in refresh_response.json() # Refresh token usually doesn't change on refresh
assert refresh_response.json()["refresh_token"] == refresh_token
@pytest.mark.api
async def test_logout(self, client):
"""Test user logout via API"""
auth_data = await register_and_login_user(client)
refresh_token = auth_data["tokens"]["refresh_token"]
logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token})
assert logout_response.status_code == status.HTTP_200_OK
assert "message" in logout_response.json()
assert "User logged out successfully" in logout_response.json()["message"]
# Attempt to refresh with the logged-out token
refresh_response_after_logout = client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED
assert "Invalid refresh token" in refresh_response_after_logout.json()["detail"]
2025-07-20 13:48:26 +02:00
class TestAuthenticationFlow:
2025-07-20 14:10:10 +02:00
"""End-to-end tests for complete authentication flows"""
@pytest.mark.integration
async def test_complete_registration_login_flow(self, client):
"""Test a complete flow from registration to login"""
user_data = generate_random_user_data()
# 1. Register
register_response = client.post("/auth/register", json=user_data)
2025-07-20 13:48:26 +02:00
assert register_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
assert "User registered successfully" in register_response.json()["message"]
# 2. Login
login_data = {"email": user_data["email"], "password": user_data["password"]}
2025-07-20 13:48:26 +02:00
login_response = client.post("/auth/login", json=login_data)
assert login_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
assert "access_token" in login_response.json()
assert "refresh_token" in login_response.json()
access_token = login_response.json()["access_token"]
refresh_token = login_response.json()["refresh_token"]
# 3. Access protected endpoint (e.g., /users/me)
me_response = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"})
assert me_response.status_code == status.HTTP_200_OK
assert me_response.json()["email"] == user_data["email"]
# 4. Refresh token
refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token})
2025-07-20 13:48:26 +02:00
assert refresh_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
new_access_token = refresh_response.json()["access_token"]
assert new_access_token != access_token # New token should be different
# 5. Access protected endpoint with new token
me_response_new = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"})
assert me_response_new.status_code == status.HTTP_200_OK
@pytest.mark.integration
async def test_token_refresh_flow(self, client):
"""Test the entire token refresh process"""
auth_data = await register_and_login_user(client)
initial_access_token = auth_data["tokens"]["access_token"]
refresh_token = auth_data["tokens"]["refresh_token"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Simulate time passing to make access token potentially expire (optional, depends on token short expiry)
# For actual testing, you might use a mocked time or shorter token expiry in test config
# Try to refresh
refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response.status_code == status.HTTP_200_OK
new_access_token = refresh_response.json()["access_token"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
assert new_access_token is not None
assert new_access_token != initial_access_token
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Verify old access token is invalid and new one is valid
old_token_access = client.get("/users/me", headers={"Authorization": f"Bearer {initial_access_token}"})
# This might be 401 if access token expired, or 200 if it hasn't, depending on validity period
# For a robust test, you'd make access tokens very short lived for this test
# assert old_token_access.status_code == status.HTTP_401_UNAUTHORIZED # If expired
new_token_access = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"})
assert new_token_access.status_code == status.HTTP_200_OK
assert "email" in new_token_access.json()
@pytest.mark.integration
async def test_logout_flow(self, client):
"""Test the logout process invalidating tokens"""
auth_data = await register_and_login_user(client)
access_token = auth_data["tokens"]["access_token"]
refresh_token = auth_data["tokens"]["refresh_token"]
# Access protected endpoint before logout
me_response_before = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"})
assert me_response_before.status_code == status.HTTP_200_OK
2025-07-20 13:48:26 +02:00
# Logout
2025-07-20 14:10:10 +02:00
logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token})
2025-07-20 13:48:26 +02:00
assert logout_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
# Try to access protected endpoint with the (now invalidated) access token
me_response_after = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"})
assert me_response_after.status_code == status.HTTP_401_UNAUTHORIZED
# Try to refresh with the (now invalidated) refresh token
refresh_response_after = client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response_after.status_code == status.HTTP_401_UNAUTHORIZED
2025-07-20 13:48:26 +02:00
class TestErrorHandling:
"""Test error handling scenarios"""
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_database_error_during_registration(self, client):
"""Test handling of database errors during registration API call"""
user_data = generate_random_user_data()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Patch get_db to return a mock session that raises an error on commit
mock_db = AsyncMock(spec=AsyncSession)
mock_db.commit.side_effect = Exception("Simulated DB commit error")
mock_db.add.return_value = None
mock_db.refresh.return_value = None
mock_db.rollback.return_value = None # Ensure rollback is mocked as well
with patch('app.core.database.get_db', autospec=True) as mock_get_db:
# Use a mock for the database session yielded by get_db
mock_get_db.return_value.__aenter__.return_value = mock_db
mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert "detail" in response.json()
assert "database error" in response.json()["detail"].lower()
@pytest.mark.api
async def test_database_error_during_authentication(self, client):
"""Test handling of database errors during authentication API call"""
# First, register a user normally so AuthService has something to authenticate against
user_data = generate_random_user_data()
register_response = client.post("/auth/register", json=user_data)
assert register_response.status_code == status.HTTP_200_OK
mock_db = AsyncMock(spec=AsyncSession)
# Mocking execute and scalar_one_or_none for user lookup
mock_execute_result = AsyncMock()
mock_execute_result.scalar_one_or_none.side_effect = Exception("Simulated DB scalar error")
mock_db.execute.return_value = mock_execute_result
mock_db.rollback.return_value = None # Ensure rollback is mocked
with patch('app.core.database.get_db', autospec=True) as mock_get_db:
# Use a mock for the database session yielded by get_db
mock_get_db.return_value.__aenter__.return_value = mock_db
mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions
response = client.post("/auth/login", json={
"email": user_data["email"],
"password": user_data["password"]
})
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert "detail" in response.json()
assert "database error" in response.json()["detail"].lower()
@pytest.mark.api
async def test_malformed_json_request(self, client):
"""Test API handling of malformed JSON requests"""
response = client.post("/auth/register", content="this is not json", headers={"Content-Type": "application/json"})
2025-07-20 13:48:26 +02:00
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "detail" in response.json()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_empty_request_body(self, client):
"""Test API handling of empty request body for POST"""
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json={})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert "detail" in response.json()
assert any("field required" in err["msg"] for err in response.json()["detail"])
2025-07-20 13:48:26 +02:00
class TestSecurity:
2025-07-20 14:10:10 +02:00
"""Tests for security aspects like password hashing, email validation, etc."""
@pytest.mark.security
async def test_password_hashing_verification(self):
"""Test password hashing and verification"""
password = "MyStrongPassword123!"
hashed_password = SecurityManager.hash_password(password)
assert hashed_password != password # Should not be plain text
assert SecurityManager.verify_password(password, hashed_password)
assert not SecurityManager.verify_password("WrongPassword", hashed_password)
@pytest.mark.security
async def test_password_requirements_enforced(self, client):
"""Test that password requirements are enforced by the API"""
common_user_data = generate_random_user_data()
2025-07-20 13:48:26 +02:00
test_cases = [
2025-07-20 14:10:10 +02:00
("Too short", "short", status.HTTP_400_BAD_REQUEST),
("No uppercase", "nouppercase123!", status.HTTP_400_BAD_REQUEST),
("No lowercase", "NOUPPERCASE123!", status.HTTP_400_BAD_REQUEST),
("No digit", "NoDigit!", status.HTTP_400_BAD_REQUEST),
("No special char", "NoSpecialChar123", status.HTTP_400_BAD_REQUEST),
("Valid password", "StrongPwd123!", status.HTTP_200_OK),
2025-07-20 13:48:26 +02:00
]
2025-07-20 14:10:10 +02:00
for desc, pwd, expected_status in test_cases:
user_data = common_user_data.copy()
user_data["password"] = pwd
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
assert response.status_code == expected_status, f"Failed for {desc} (password: {pwd})"
if expected_status != status.HTTP_200_OK:
assert "password" in response.json()["detail"].lower()
@pytest.mark.security
async def test_email_validation(self, client):
"""Test that email validation is properly performed by the API"""
common_user_data = generate_random_user_data()
test_cases = [
("valid@email.com", True, status.HTTP_200_OK),
("user.name+tag@domain.co.uk", True, status.HTTP_200_OK), # Valid special characters
("invalid-email", False, status.HTTP_422_UNPROCESSABLE_ENTITY),
("user@.com", False, status.HTTP_422_UNPROCESSABLE_ENTITY),
("@domain.com", False, status.HTTP_422_UNPROCESSABLE_ENTITY),
("user@domain", False, status.HTTP_422_UNPROCESSABLE_ENTITY),
("user@domain..com", False, status.HTTP_422_UNPROCESSABLE_ENTITY),
]
for email, is_valid, expected_status in test_cases:
user_data = common_user_data.copy()
user_data["email"] = email
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
assert response.status_code == expected_status, f"Failed for email: {email}"
if is_valid:
assert response.json()["email"] == email
else:
assert "email" in response.json()["detail"][0]["loc"]
@pytest.mark.security
async def test_sql_injection_prevention(self, client, test_db):
"""Test API's resistance to SQL injection attempts"""
malicious_email = "test@bakery.es' OR 1=1; --"
user_data = generate_random_user_data()
user_data["email"] = malicious_email
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
# We expect a validation error (422) or bad request (400) not a successful registration
2025-07-20 13:48:26 +02:00
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
2025-07-20 14:10:10 +02:00
if response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY:
assert "email" in response.json()["detail"][0]["loc"] # Pydantic validation error
else: # e.g. status.HTTP_400_BAD_REQUEST from custom validation
assert "invalid" in response.json()["detail"].lower() or "malformed" in response.json()["detail"].lower()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Try with login
response = client.post("/auth/login", json={
"email": malicious_email,
"password": "AnyPassword123!"
})
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_401_UNAUTHORIZED]
2025-07-20 13:48:26 +02:00
class TestRateLimiting:
2025-07-20 14:10:10 +02:00
"""Test rate limiting functionality (if implemented)"""
# Requires an actual rate-limiting implementation in the FastAPI app
# This mock is for demonstration and assumes a simple per-IP rate limit
# FastAPI-Limiter or similar would be needed for a real implementation
@pytest.mark.api
@pytest.mark.slow # This test can be slow due to potential sleeps or multiple requests
async def test_multiple_registration_attempts(self, client):
"""Test rate limiting on registration endpoint"""
# This test needs actual rate limiting middleware in the app for a meaningful result.
# Without it, it will just succeed for all requests.
# Assuming a hypothetical rate limit of 5 requests per minute from a single IP.
num_attempts = 10
success_count = 0
rate_limited_count = 0
for _ in range(num_attempts):
user_data = generate_random_user_data()
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
if response.status_code == status.HTTP_200_OK:
success_count += 1
elif response.status_code == status.HTTP_429_TOO_MANY_REQUESTS:
rate_limited_count += 1
else:
# Other errors are unexpected. Adjust this assertion if other valid error codes occur.
pytest.fail(f"Unexpected status code: {response.status_code} - {response.json()} for attempt {_}")
# Optional: Add a small delay if the rate limit resets quickly
# await asyncio.sleep(0.1)
# If rate limiting is NOT implemented or is very loose, all may succeed (up to unique email constraint).
# If implemented, some should be 429.
# This assertion needs to be flexible enough for environments where rate limiting might not be active.
# If all requests succeed, it implies rate limiting is not active or is very lenient.
# If rate limiting is expected, you'd assert rate_limited_count > 0.
# For now, just ensure no 404s and some registrations went through.
assert success_count > 0 # At least one registration should work
# Assert that if there are too many attempts, either some fail due to duplicate email, or some are rate-limited.
# The main point is not getting 404.
# If rate limiting is not implemented, client.post will return 200 (if unique email) or 400 (duplicate email)
assert (success_count + rate_limited_count) == num_attempts or \
(success_count + rate_limited_count < num_attempts and any(r.status_code == status.HTTP_400_BAD_REQUEST for r in responses))
2025-07-20 13:48:26 +02:00
class TestPerformance:
2025-07-20 14:10:10 +02:00
"""Basic performance tests for critical endpoints"""
@pytest.mark.performance
@pytest.mark.slow
async def test_registration_performance(self, client):
"""Measure performance of user registration"""
num_users = 50 # Number of registrations to test
2025-07-20 13:48:26 +02:00
start_time = time.time()
2025-07-20 14:10:10 +02:00
tasks = []
for _ in range(num_users):
user_data = generate_random_user_data(f"perf_{_}")
tasks.append(asyncio.create_task(
asyncio.to_thread(client.post, "/auth/register", json=user_data)
))
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
responses = await asyncio.gather(*tasks)
end_time = time.time()
for response in responses:
assert response.status_code == status.HTTP_200_OK, f"Registration failed with status {response.status_code}: {response.json()}"
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
duration = end_time - start_time
print(f"\nRegistered {num_users} users in {duration:.2f} seconds ({num_users/duration:.2f} req/s)")
# Assert against a reasonable threshold
assert duration < 5.0 # Example: 50 users in under 5 seconds
@pytest.mark.performance
@pytest.mark.slow
async def test_login_performance(self, client):
"""Measure performance of user login"""
num_logins = 50
# Pre-register users
users_to_login_data = [] # Store full user data including password
for _ in range(num_logins):
user_data = generate_random_user_data(f"login_perf_{_}")
client.post("/auth/register", json=user_data) # Register user
users_to_login_data.append(user_data) # Store for login
start_time = time.time()
tasks = []
for user_data in users_to_login_data:
login_data = {"email": user_data["email"], "password": user_data["password"]}
tasks.append(asyncio.create_task(
asyncio.to_thread(client.post, "/auth/login", json=login_data)
))
responses = await asyncio.gather(*tasks)
2025-07-20 13:48:26 +02:00
end_time = time.time()
2025-07-20 14:10:10 +02:00
for response in responses:
assert response.status_code == status.HTTP_200_OK, f"Login failed with status {response.status_code}: {response.json()}"
2025-07-20 13:48:26 +02:00
duration = end_time - start_time
2025-07-20 14:10:10 +02:00
print(f"\nLogged in {num_logins} users in {duration:.2f} seconds ({num_logins/duration:.2f} req/s)")
assert duration < 5.0 # Example: 50 logins in under 5 seconds
2025-07-20 13:48:26 +02:00
class TestEdgeCases:
2025-07-20 14:10:10 +02:00
"""Tests for various edge cases and unusual inputs"""
@pytest.mark.api
async def test_very_long_email(self, client):
"""Test registration with a very long email address"""
# Max email length is typically 254 characters (RFC 3696)
long_local_part = "a" * 60 # Max 64 for local part
long_domain_part = "b" * 180 # Max 255 for domain (sum of labels + dots)
long_email = f"{long_local_part}@{long_domain_part}.com" # Roughly 250+ chars
user_data = generate_random_user_data()
user_data["email"] = long_email
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
# Depending on validation/DB schema, could be 200 (valid), 400 (too long), or 422 (validation)
2025-07-20 13:48:26 +02:00
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
2025-07-20 14:10:10 +02:00
if response.status_code == status.HTTP_200_OK:
assert response.json()["email"] == long_email
@pytest.mark.api
async def test_very_long_full_name(self, client):
"""Test registration with a very long full name"""
long_name = "A" * 500 # Assume a reasonable max length like 255 or 500
user_data = generate_random_user_data()
user_data["full_name"] = long_name
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
2025-07-20 14:10:10 +02:00
if response.status_code == status.HTTP_200_OK:
assert response.json()["full_name"] == long_name
@pytest.mark.api
async def test_very_long_password(self, client):
"""Test registration with a very long password"""
long_password = "P" + "a" * 200 + "1!" # Very long, but valid chars
user_data = generate_random_user_data()
user_data["password"] = long_password
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_unicode_characters_in_name(self, client):
"""Test registration with unicode characters in full name"""
unicode_name = "परीक्षा उपयोगकर्ता नाम 😀" # Example unicode characters
user_data = generate_random_user_data()
user_data["full_name"] = unicode_name
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
assert response.json()["full_name"] == unicode_name
@pytest.mark.api
async def test_special_characters_in_email(self, client):
"""Test registration with email containing special but valid characters"""
# Test valid email with '+' alias and '.'
test_cases = [
"test+alias@bakery.es",
"first.last@bakery.es",
2025-07-20 13:48:26 +02:00
]
2025-07-20 14:10:10 +02:00
for email in test_cases:
user_data = generate_random_user_data()
user_data["email"] = email
2025-07-20 13:48:26 +02:00
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK, f"Failed for email: {email}"
2025-07-20 14:10:10 +02:00
assert response.json()["email"] == email
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_empty_strings(self, client):
"""Test registration with empty strings for required fields"""
2025-07-20 13:48:26 +02:00
user_data = {
"email": "",
2025-07-20 14:10:10 +02:00
"password": "TestPassword123!",
2025-07-20 13:48:26 +02:00
"full_name": ""
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert any("email" in err["loc"] and "value has no fewer than 1 characters" in err["msg"] for err in response.json()["detail"])
assert any("full_name" in err["loc"] and "value has no fewer than 1 characters" in err["msg"] for err in response.json()["detail"])
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_null_values(self, client):
"""Test registration with null values for required fields"""
2025-07-20 13:48:26 +02:00
user_data = {
"email": None,
2025-07-20 14:10:10 +02:00
"password": "TestPassword123!",
2025-07-20 13:48:26 +02:00
"full_name": None
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
2025-07-20 14:10:10 +02:00
assert any("email" in err["loc"] and "none is not an allowed value" in err["msg"].lower() for err in response.json()["detail"])
assert any("full_name" in err["loc"] and "none is not an allowed value" in err["msg"].lower() for err in response.json()["detail"])
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
@pytest.mark.api
async def test_whitespace_only_fields(self, client):
"""Test registration with whitespace-only strings for required fields"""
2025-07-20 13:48:26 +02:00
user_data = {
"email": " ",
2025-07-20 14:10:10 +02:00
"password": "TestPassword123!",
2025-07-20 13:48:26 +02:00
"full_name": " "
}
response = client.post("/auth/register", json=user_data)
2025-07-20 14:10:10 +02:00
# Expected behavior: 422 for validation, or 400 if stripped and then invalid
assert response.status_code in [status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_400_BAD_REQUEST]
if response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY:
assert any("email" in err["loc"] for err in response.json()["detail"]) or \
any("full_name" in err["loc"] for err in response.json()["detail"])
@pytest.mark.api
async def test_case_sensitivity_email(self, client):
"""Test case sensitivity of email during login"""
user_data = generate_random_user_data()
original_email = user_data["email"]
client.post("/auth/register", json=user_data) # Register with original casing
# Try logging in with different casing
lower_email = original_email.lower()
upper_email = original_email.upper()
if original_email != lower_email: # Only test if casing actually changes
response_lower = client.post("/auth/login", json={"email": lower_email, "password": user_data["password"]})
assert response_lower.status_code == status.HTTP_200_OK, "Login failed with lowercase email"
if original_email != upper_email:
response_upper = client.post("/auth/login", json={"email": upper_email, "password": user_data["password"]})
# FastAPI's email validator might convert to lowercase; database might be case-insensitive
# This assertion depends on your specific implementation of email uniqueness/lookup
# Often, emails are normalized to lowercase for uniqueness checks.
# Assuming normalization or case-insensitivity:
assert response_upper.status_code == status.HTTP_200_OK, "Login failed with uppercase email"
2025-07-20 13:48:26 +02:00
class TestConcurrency:
2025-07-20 14:10:10 +02:00
"""Tests for concurrent operations"""
@pytest.mark.integration
@pytest.mark.slow
async def test_concurrent_registration_same_email(self, client):
"""Test concurrent registration attempts with the same email"""
shared_user_data = generate_random_user_data()
num_attempts = 5
async def register_task():
return await asyncio.to_thread(client.post, "/auth/register", json=shared_user_data)
tasks = [register_task() for _ in range(num_attempts)]
responses = await asyncio.gather(*tasks)
success_count = 0
duplicate_count = 0
for response in responses:
if response.status_code == status.HTTP_200_OK:
success_count += 1
elif response.status_code == status.HTTP_400_BAD_REQUEST and "Email already registered" in response.json().get("detail", ""):
duplicate_count += 1
else:
pytest.fail(f"Unexpected status code for concurrent registration: {response.status_code} - {response.json()}")
assert success_count == 1, f"Expected exactly one successful registration, got {success_count}"
assert duplicate_count == num_attempts - 1, f"Expected {num_attempts - 1} duplicate errors, got {duplicate_count}"
@pytest.mark.integration
@pytest.mark.slow
async def test_concurrent_login_attempts(self, client):
"""Test concurrent login attempts for a single user"""
auth_data = await register_and_login_user(client)
user_email = auth_data["user_data"]["email"] # Get email from original user_data
user_password = auth_data["user_data"]["password"] # Get password from original user_data
login_data = {"email": user_email, "password": user_password}
num_attempts = 10
async def login_task():
return await asyncio.to_thread(client.post, "/auth/login", json=login_data)
tasks = [login_task() for _ in range(num_attempts)]
responses = await asyncio.gather(*tasks)
for response in responses:
assert response.status_code == status.HTTP_200_OK
assert "access_token" in response.json()
assert "refresh_token" in response.json()
2025-07-20 13:48:26 +02:00
class TestDataIntegrity:
2025-07-20 14:10:10 +02:00
"""Tests to ensure data integrity"""
@pytest.mark.integration
async def test_user_data_integrity_after_creation(self, test_user, test_db):
"""Verify user data fields after creation"""
# test_user is already awaited by pytest-asyncio
fetched_user = await test_db.execute(text("SELECT * FROM users WHERE id = :id"), {"id": test_user.id})
user_row = fetched_user.scalar_one_or_none()
assert user_row is not None
assert user_row.email == test_user.email
assert user_row.full_name == test_user.full_name
assert user_row.hashed_password == test_user.hashed_password
assert user_row.is_active is True
assert user_row.created_at is not None
assert user_row.updated_at is not None
assert user_row.last_login is None # Should be None until first login
@pytest.mark.integration
async def test_last_login_update(self, test_user, test_db):
"""Test that last_login timestamp is updated on successful login"""
# Ensure last_login is None initially
assert test_user.last_login is None # test_user is already awaited
await AuthService.login_user(test_user.email, "TestPassword123!", test_db)
# Re-fetch user to get updated last_login
updated_user_result = await test_db.execute(text("SELECT * FROM users WHERE id = :id"), {"id": test_user.id})
fetched_user_obj = updated_user_result.scalar_one_or_none()
assert fetched_user_obj is not None
assert fetched_user_obj.last_login is not None
# Assert last_login is recent (e.g., within the last minute)
assert datetime.now(timezone.utc) - fetched_user_obj.last_login < timedelta(minutes=1)
@pytest.mark.integration
async def test_database_rollback_on_error(self, test_db):
"""Test that database operations are rolled back on error"""
initial_user_count_query = await test_db.execute(text("SELECT COUNT(*) FROM users"))
initial_user_count = initial_user_count_query.scalar_one()
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
mock_db_for_rollback = AsyncMock(spec=AsyncSession)
mock_db_for_rollback.commit.side_effect = Exception("Simulated rollback error")
mock_db_for_rollback.add.return_value = None
mock_db_for_rollback.refresh.return_value = None
mock_db_for_rollback.rollback.return_value = None # Ensure rollback is mocked as well
# Patching get_db to return our mock session for the duration of this test
with patch('app.core.database.get_db', autospec=True) as mock_get_db:
mock_get_db.return_value.__aenter__.return_value = mock_db_for_rollback
mock_get_db.return_value.__aexit__.return_value = False # Don't suppress exceptions
2025-07-20 13:48:26 +02:00
try:
2025-07-20 14:10:10 +02:00
# Attempt to create a user, which should trigger the commit error and then rollback
2025-07-20 13:48:26 +02:00
await AuthService.create_user(
2025-07-20 14:10:10 +02:00
"rollback_test@bakery.es", "Password123!", "Rollback User", mock_db_for_rollback
2025-07-20 13:48:26 +02:00
)
2025-07-20 14:10:10 +02:00
pytest.fail("Expected an exception during user creation, but none was raised.")
except Exception as e:
assert "Simulated rollback error" in str(e)
# After the error, the session state should effectively be rolled back
# We need a fresh session to verify the count if the original was mocked
# No need for new session if test_db fixture ensures rollback.
# However, if testing a scenario where a separate session is opened and fails:
fresh_user_count_query = await test_db.execute(text("SELECT COUNT(*) FROM users"))
current_user_count = fresh_user_count_query.scalar_one()
assert current_user_count == initial_user_count, "Database state was not rolled back after error."
2025-07-20 13:48:26 +02:00
class TestTokenManagement:
2025-07-20 14:10:10 +02:00
"""Tests for JWT token generation and validation"""
@pytest.mark.security
async def test_token_expiration_validation(self):
"""Test that expired tokens are not considered valid"""
# Create a token that expires very soon
# Adjusted to pass arguments explicitly as expected by SecurityManager.create_access_token
expired_token = SecurityManager.create_access_token(
user_id="123",
email="test@exp.com",
full_name="Expired User", # Added full_name as it's likely part of claims
expires_delta=timedelta(seconds=-1) # Already expired
)
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
with pytest.raises(ValueError, match="Token has expired"):
SecurityManager.verify_token(expired_token)
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Test a valid token
valid_token = SecurityManager.create_access_token(
user_id="456",
email="test@valid.com",
full_name="Valid User",
expires_delta=timedelta(minutes=5)
)
decoded_valid = SecurityManager.verify_token(valid_token)
assert decoded_valid["email"] == "test@valid.com"
@pytest.mark.security
async def test_token_contains_correct_claims(self, test_user):
"""Test that generated tokens contain the correct user claims"""
# test_user is already awaited by pytest-asyncio
token = SecurityManager.create_access_token(
user_id=str(test_user.id),
email=test_user.email,
full_name=test_user.full_name,
expires_delta=timedelta(minutes=15)
)
decoded_token = SecurityManager.verify_token(token)
assert decoded_token is not None
assert decoded_token["sub"] == str(test_user.id)
assert decoded_token["email"] == test_user.email
assert decoded_token["full_name"] == test_user.full_name
assert "exp" in decoded_token
assert "iat" in decoded_token
@pytest.mark.security
async def test_token_tampering_detection(self):
"""Test that tampered tokens are rejected"""
# Adjusted to pass arguments explicitly as expected by SecurityManager.create_access_token
original_token = SecurityManager.create_access_token(
user_id="123",
email="test@tamper.com",
full_name="Tamper User", # Added full_name
expires_delta=timedelta(minutes=15)
)
# Simple tampering: change a character
# JWTs are base64 encoded and have 3 parts: header.payload.signature
# Tampering the payload will invalidate the signature.
parts = original_token.split('.')
if len(parts) == 3:
tampered_payload = parts[1] + "X" # Simple modification
tampered_token = f"{parts[0]}.{tampered_payload}.{parts[2]}"
else:
pytest.fail("Original token format unexpected for tampering test.")
with pytest.raises(ValueError, match="Could not validate credentials|Invalid token signature"):
SecurityManager.verify_token(tampered_token)
@pytest.mark.security
async def test_token_with_invalid_signature(self):
"""Test that tokens with invalid signatures are rejected"""
# Create a token with a known structure but wrong secret
token_parts = [
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", # Header (alg: HS256, typ: JWT)
# Payload with sub, email, iat, exp (arbitrary values)
"eyJzdWIiOiIxMjM0NTY3ODkwIiwiZW1haWwiOiJ0ZXN0QGV4YW1wbGUuY29tIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE2NzIyMzkwMjIsInZnbiI6IkFub255bW91cyBVc2VyIn0",
"invalid_signature_here" # Invalid signature
]
invalid_signature_token = ".".join(token_parts)
with pytest.raises(ValueError, match="Could not validate credentials|Invalid token signature"):
SecurityManager.verify_token(invalid_signature_token)
2025-07-20 13:48:26 +02:00
class TestCompleteAuthenticationFlows:
2025-07-20 14:10:10 +02:00
"""End-to-end tests for full user lifecycle scenarios"""
@pytest.mark.integration
@pytest.mark.slow
async def test_full_user_lifecycle(self, client, test_db):
"""Test the complete user lifecycle: register, login, access, refresh, logout, re-login"""
user_data = generate_random_user_data()
# 1. Register
2025-07-20 13:48:26 +02:00
register_response = client.post("/auth/register", json=user_data)
assert register_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
2025-07-20 13:48:26 +02:00
# 2. Login
2025-07-20 14:10:10 +02:00
login_response = client.post("/auth/login", json={"email": user_data["email"], "password": user_data["password"]})
2025-07-20 13:48:26 +02:00
assert login_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
access_token = login_response.json()["access_token"]
refresh_token = login_response.json()["refresh_token"]
# 3. Access protected endpoint
me_response = client.get("/users/me", headers={"Authorization": f"Bearer {access_token}"})
assert me_response.status_code == status.HTTP_200_OK
assert me_response.json()["email"] == user_data["email"]
2025-07-20 13:48:26 +02:00
# 4. Refresh token
2025-07-20 14:10:10 +02:00
refresh_response = client.post("/auth/refresh", json={"refresh_token": refresh_token})
2025-07-20 13:48:26 +02:00
assert refresh_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
new_access_token = refresh_response.json()["access_token"]
assert new_access_token != access_token
# 5. Access protected endpoint with new token
me_response_new = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"})
assert me_response_new.status_code == status.HTTP_200_OK
2025-07-20 13:48:26 +02:00
# 6. Logout
2025-07-20 14:10:10 +02:00
logout_response = client.post("/auth/logout", json={"refresh_token": refresh_token})
2025-07-20 13:48:26 +02:00
assert logout_response.status_code == status.HTTP_200_OK
2025-07-20 14:10:10 +02:00
# 7. Verify tokens are invalidated
me_response_after_logout = client.get("/users/me", headers={"Authorization": f"Bearer {new_access_token}"})
assert me_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED
refresh_response_after_logout = client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response_after_logout.status_code == status.HTTP_401_UNAUTHORIZED
# 8. Re-login with same credentials
re_login_response = client.post("/auth/login", json={"email": user_data["email"], "password": user_data["password"]})
assert re_login_response.status_code == status.HTTP_200_OK
re_access_token = re_login_response.json()["access_token"]
# 9. Access protected endpoint with re-logged in token
me_response_re_login = client.get("/users/me", headers={"Authorization": f"Bearer {re_access_token}"})
assert me_response_re_login.status_code == status.HTTP_200_OK
assert me_response_re_login.json()["email"] == user_data["email"]
@pytest.mark.integration
async def test_multiple_sessions_same_user(self, client, test_db):
"""Test a user logging in from multiple sessions concurrently"""
user_data = generate_random_user_data()
client.post("/auth/register", json=user_data)
num_sessions = 3
login_tasks = []
for _ in range(num_sessions):
login_tasks.append(asyncio.create_task(
asyncio.to_thread(client.post, "/auth/login", json={"email": user_data["email"], "password": user_data["password"]})
))
login_responses = await asyncio.gather(*login_tasks)
all_access_tokens = []
all_refresh_tokens = []
for response in login_responses:
assert response.status_code == status.HTTP_200_OK
assert "access_token" in response.json()
assert "refresh_token" in response.json()
all_access_tokens.append(response.json()["access_token"])
all_refresh_tokens.append(response.json()["refresh_token"])
assert len(all_access_tokens) == num_sessions
assert len(all_refresh_tokens) == num_sessions
# Verify all access tokens are valid
access_check_tasks = []
for token in all_access_tokens:
access_check_tasks.append(asyncio.create_task(
asyncio.to_thread(client.get, "/users/me", headers={"Authorization": f"Bearer {token}"})
))
access_check_responses = await asyncio.gather(*access_check_tasks)
for response in access_check_responses:
assert response.status_code == status.HTTP_200_OK
assert response.json()["email"] == user_data["email"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Logout one session and ensure others remain active
logout_response = client.post("/auth/logout", json={"refresh_token": all_refresh_tokens[0]})
assert logout_response.status_code == status.HTTP_200_OK
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# The first session's access token should now be invalid
single_logout_access_check = client.get("/users/me", headers={"Authorization": f"Bearer {all_access_tokens[0]}"})
assert single_logout_access_check.status_code == status.HTTP_401_UNAUTHORIZED
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# Other sessions should still be valid
remaining_access_check_tasks = []
for token in all_access_tokens[1:]:
remaining_access_check_tasks.append(asyncio.create_task(
asyncio.to_thread(client.get, "/users/me", headers={"Authorization": f"Bearer {token}"})
))
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
remaining_access_check_responses = await asyncio.gather(*remaining_access_check_tasks)
for response in remaining_access_check_responses:
assert response.status_code == status.HTTP_200_OK
assert response.json()["email"] == user_data["email"]
2025-07-20 13:48:26 +02:00
2025-07-20 14:10:10 +02:00
# ================================================================\
2025-07-20 13:48:26 +02:00
# PYTEST CONFIGURATION
2025-07-20 14:10:10 +02:00
# ================================================================\
2025-07-20 13:48:26 +02:00
# pytest configuration for the test file
pytest_plugins = ["pytest_asyncio"]
# Markers for different test categories
pytestmark = [
pytest.mark.asyncio,
pytest.mark.auth,
]
# Test configuration
def pytest_configure(config):
"""Configure pytest markers"""
config.addinivalue_line(
"markers", "auth: marks tests as authentication tests"
)
config.addinivalue_line(
"markers", "integration: marks tests as integration tests"
)
config.addinivalue_line(
"markers", "performance: marks tests as performance tests"
)
config.addinivalue_line(
"markers", "security: marks tests as security tests"
2025-07-20 14:10:10 +02:00
)