Files
bakery-ia/services/auth/tests/test_auth_comprehensive.py
2025-07-20 13:48:26 +02:00

1398 lines
51 KiB
Python

# ================================================================
# services/auth/tests/test_auth_comprehensive.py
# Complete pytest test suite for authentication service
# ================================================================
"""
Comprehensive test suite for the authentication service
Tests registration, login, and all authentication-related functionality
"""
import pytest
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient
from fastapi import status
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
import redis.asyncio as redis
# Import the authentication service modules
from app.main import app
from app.models.users import User, RefreshToken
from app.schemas.auth import UserRegistration, UserLogin, TokenResponse
from app.services.auth_service import AuthService
from app.core.security import SecurityManager
from app.core.config import settings
from app.core.database import get_db
from shared.database.base import Base
# ================================================================
# TEST CONFIGURATION AND FIXTURES
# ================================================================
# Test database configuration - Use in-memory SQLite for speed
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="function")
async def test_engine():
"""Create test database engine"""
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
future=True
)
# Create all tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Clean up
await engine.dispose()
@pytest.fixture(scope="function")
async def test_db(test_engine):
"""Create test database session"""
async_session = sessionmaker(
test_engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
@pytest.fixture(scope="function")
def client(test_db):
"""Create test client with database dependency override"""
def override_get_db():
return test_db
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
# Clean up
app.dependency_overrides.clear()
@pytest.fixture
def mock_redis():
"""Mock Redis client for testing"""
redis_mock = AsyncMock()
redis_mock.get.return_value = None
redis_mock.incr.return_value = 1
redis_mock.expire.return_value = True
redis_mock.delete.return_value = True
redis_mock.setex.return_value = True
return redis_mock
@pytest.fixture
def valid_user_data():
"""Valid user registration data"""
return {
"email": "test@bakery.es",
"password": "TestPassword123",
"full_name": "Test User"
}
@pytest.fixture
def weak_password_user_data():
"""User data with weak password"""
return {
"email": "test@bakery.es",
"password": "weak",
"full_name": "Test User"
}
@pytest.fixture
def invalid_email_user_data():
"""User data with invalid email"""
return {
"email": "invalid-email",
"password": "TestPassword123",
"full_name": "Test User"
}
@pytest.fixture
async def test_user(test_db):
"""Create a test user in the database"""
user_data = UserRegistration(
email="existing@bakery.es",
password="TestPassword123",
full_name="Existing User"
)
user = await AuthService.create_user(
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
db=test_db
)
return user
# ================================================================
# SECURITY MANAGER TESTS
# ================================================================
class TestSecurityManager:
"""Test the SecurityManager utility class"""
def test_hash_password(self):
"""Test password hashing"""
password = "TestPassword123"
hashed = SecurityManager.hash_password(password)
assert hashed != password
assert len(hashed) > 50 # bcrypt hashes are long
assert hashed.startswith('$2b$') # bcrypt format
def test_verify_password_correct(self):
"""Test password verification with correct password"""
password = "TestPassword123"
hashed = SecurityManager.hash_password(password)
assert SecurityManager.verify_password(password, hashed) is True
def test_verify_password_incorrect(self):
"""Test password verification with incorrect password"""
password = "TestPassword123"
wrong_password = "WrongPassword123"
hashed = SecurityManager.hash_password(password)
assert SecurityManager.verify_password(wrong_password, hashed) is False
@pytest.mark.parametrize("password,expected", [
("TestPassword123", True), # Valid password
("TestPwd123", True), # Valid but shorter
("testpassword123", False), # No uppercase
("TESTPASSWORD123", False), # No lowercase
("TestPassword", False), # No numbers
("Test123", False), # Too short
("", False), # Empty
])
def test_validate_password(self, password, expected):
"""Test password validation with various inputs"""
assert SecurityManager.validate_password(password) == expected
def test_create_access_token(self):
"""Test JWT access token creation"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es",
"full_name": "Test User"
}
token = SecurityManager.create_access_token(user_data)
assert isinstance(token, str)
assert len(token) > 100 # JWT tokens are long
# Verify token can be decoded
decoded = SecurityManager.verify_token(token)
assert decoded is not None
assert decoded["email"] == user_data["email"]
def test_create_refresh_token(self):
"""Test JWT refresh token creation"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es"
}
token = SecurityManager.create_refresh_token(user_data)
assert isinstance(token, str)
assert len(token) > 100
def test_verify_token_valid(self):
"""Test JWT token verification with valid token"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es"
}
token = SecurityManager.create_access_token(user_data)
decoded = SecurityManager.verify_token(token)
assert decoded is not None
assert decoded["user_id"] == user_data["user_id"]
assert decoded["email"] == user_data["email"]
def test_verify_token_invalid(self):
"""Test JWT token verification with invalid token"""
invalid_token = "invalid.jwt.token"
decoded = SecurityManager.verify_token(invalid_token)
assert decoded is None
def test_hash_token(self):
"""Test token hashing for storage"""
token = "test_token_string"
hashed = SecurityManager.hash_token(token)
assert hashed != token
assert len(hashed) == 64 # SHA256 hex digest length
@pytest.mark.asyncio
async def test_check_login_attempts_no_attempts(self, mock_redis):
"""Test checking login attempts when none exist"""
with patch('app.core.security.redis_client', mock_redis):
mock_redis.get.return_value = None
result = await SecurityManager.check_login_attempts("test@bakery.es")
assert result is True
@pytest.mark.asyncio
async def test_check_login_attempts_under_limit(self, mock_redis):
"""Test checking login attempts under the limit"""
with patch('app.core.security.redis_client', mock_redis):
mock_redis.get.return_value = "3" # Under limit of 5
result = await SecurityManager.check_login_attempts("test@bakery.es")
assert result is True
@pytest.mark.asyncio
async def test_check_login_attempts_over_limit(self, mock_redis):
"""Test checking login attempts over the limit"""
with patch('app.core.security.redis_client', mock_redis):
mock_redis.get.return_value = "5" # At limit
result = await SecurityManager.check_login_attempts("test@bakery.es")
assert result is False
@pytest.mark.asyncio
async def test_increment_login_attempts(self, mock_redis):
"""Test incrementing login attempts"""
with patch('app.core.security.redis_client', mock_redis):
await SecurityManager.increment_login_attempts("test@bakery.es")
mock_redis.incr.assert_called_once()
mock_redis.expire.assert_called_once()
@pytest.mark.asyncio
async def test_clear_login_attempts(self, mock_redis):
"""Test clearing login attempts"""
with patch('app.core.security.redis_client', mock_redis):
await SecurityManager.clear_login_attempts("test@bakery.es")
mock_redis.delete.assert_called_once()
# ================================================================
# AUTH SERVICE TESTS
# ================================================================
class TestAuthService:
"""Test the AuthService business logic"""
@pytest.mark.asyncio
async def test_create_user_success(self, test_db):
"""Test successful user creation"""
email = "newuser@bakery.es"
password = "TestPassword123"
full_name = "New User"
user = await AuthService.create_user(email, password, full_name, test_db)
assert user.email == email
assert user.full_name == full_name
assert user.is_active is True
assert user.is_verified is False
assert user.id is not None
assert user.created_at is not None
# Verify password was hashed
assert user.hashed_password != password
assert SecurityManager.verify_password(password, user.hashed_password)
@pytest.mark.asyncio
async def test_create_user_duplicate_email(self, test_db, test_user):
"""Test user creation with duplicate email"""
email = test_user.email # Use existing user's email
password = "TestPassword123"
full_name = "Duplicate User"
with pytest.raises(Exception) as exc_info:
await AuthService.create_user(email, password, full_name, test_db)
# Should raise HTTPException for duplicate email
assert "already registered" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_authenticate_user_success(self, test_db, test_user):
"""Test successful user authentication"""
email = test_user.email
password = "TestPassword123" # Original password
authenticated_user = await AuthService.authenticate_user(email, password, test_db)
assert authenticated_user is not None
assert authenticated_user.id == test_user.id
assert authenticated_user.email == email
@pytest.mark.asyncio
async def test_authenticate_user_wrong_password(self, test_db, test_user):
"""Test authentication with wrong password"""
email = test_user.email
wrong_password = "WrongPassword123"
authenticated_user = await AuthService.authenticate_user(email, wrong_password, test_db)
assert authenticated_user is None
@pytest.mark.asyncio
async def test_authenticate_user_nonexistent(self, test_db):
"""Test authentication with non-existent email"""
email = "nonexistent@bakery.es"
password = "TestPassword123"
authenticated_user = await AuthService.authenticate_user(email, password, test_db)
assert authenticated_user is None
@pytest.mark.asyncio
async def test_authenticate_user_inactive(self, test_db):
"""Test authentication with inactive user"""
# Create inactive user
user = await AuthService.create_user(
"inactive@bakery.es",
"TestPassword123",
"Inactive User",
test_db
)
# Make user inactive
user.is_active = False
await test_db.commit()
authenticated_user = await AuthService.authenticate_user(
"inactive@bakery.es",
"TestPassword123",
test_db
)
assert authenticated_user is None
@pytest.mark.asyncio
async def test_login_user_success(self, test_db, test_user, mock_redis):
"""Test successful login"""
with patch('app.core.security.redis_client', mock_redis):
with patch('app.services.auth_service.AuthService._get_user_tenants') as mock_tenants:
mock_tenants.return_value = []
result = await AuthService.login(
test_user.email,
"TestPassword123",
test_db
)
assert isinstance(result, dict)
assert "access_token" in result
assert "refresh_token" in result
assert "user" in result
assert result["user"]["email"] == test_user.email
@pytest.mark.asyncio
async def test_login_user_invalid_credentials(self, test_db, test_user):
"""Test login with invalid credentials"""
with pytest.raises(Exception) as exc_info:
await AuthService.login(
test_user.email,
"WrongPassword123",
test_db
)
assert "invalid credentials" in str(exc_info.value).lower()
# ================================================================
# API ENDPOINT TESTS
# ================================================================
class TestAuthenticationAPI:
"""Test the authentication API endpoints"""
def test_register_success(self, client, valid_user_data):
"""Test successful user registration via API"""
response = client.post("/auth/register", json=valid_user_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["email"] == valid_user_data["email"]
assert data["full_name"] == valid_user_data["full_name"]
assert data["is_active"] is True
assert data["is_verified"] is False
assert "id" in data
assert "created_at" in data
def test_register_weak_password(self, client, weak_password_user_data):
"""Test registration with weak password"""
response = client.post("/auth/register", json=weak_password_user_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = response.json()
assert "password" in data["detail"].lower()
def test_register_invalid_email(self, client, invalid_email_user_data):
"""Test registration with invalid email"""
response = client.post("/auth/register", json=invalid_email_user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_register_missing_fields(self, client):
"""Test registration with missing required fields"""
incomplete_data = {
"email": "test@bakery.es"
# Missing password and full_name
}
response = client.post("/auth/register", json=incomplete_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_register_duplicate_email(self, client, valid_user_data):
"""Test registration with duplicate email"""
# First registration
response1 = client.post("/auth/register", json=valid_user_data)
assert response1.status_code == status.HTTP_200_OK
# Second registration with same email
response2 = client.post("/auth/register", json=valid_user_data)
assert response2.status_code == status.HTTP_400_BAD_REQUEST
data = response2.json()
assert "already registered" in data["detail"].lower()
def test_login_success(self, client, valid_user_data):
"""Test successful login via API"""
# First register a user
client.post("/auth/register", json=valid_user_data)
# Then login
login_data = {
"email": valid_user_data["email"],
"password": valid_user_data["password"]
}
response = client.post("/auth/login", json=login_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
assert "expires_in" in data
assert "user" in data
assert data["user"]["email"] == valid_user_data["email"]
def test_login_wrong_password(self, client, valid_user_data):
"""Test login with wrong password"""
# First register a user
client.post("/auth/register", json=valid_user_data)
# Then login with wrong password
login_data = {
"email": valid_user_data["email"],
"password": "WrongPassword123"
}
response = client.post("/auth/login", json=login_data)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
data = response.json()
assert "invalid credentials" in data["detail"].lower()
def test_login_nonexistent_user(self, client):
"""Test login with non-existent user"""
login_data = {
"email": "nonexistent@bakery.es",
"password": "TestPassword123"
}
response = client.post("/auth/login", json=login_data)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_login_missing_fields(self, client):
"""Test login with missing fields"""
incomplete_data = {
"email": "test@bakery.es"
# Missing password
}
response = client.post("/auth/login", json=incomplete_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_login_invalid_email_format(self, client):
"""Test login with invalid email format"""
login_data = {
"email": "invalid-email",
"password": "TestPassword123"
}
response = client.post("/auth/login", json=login_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
# ================================================================
# INTEGRATION TESTS
# ================================================================
class TestAuthenticationFlow:
"""Test complete authentication flows"""
def test_complete_registration_login_flow(self, client, valid_user_data):
"""Test complete flow: register -> login -> access protected endpoint"""
# Step 1: Register
register_response = client.post("/auth/register", json=valid_user_data)
assert register_response.status_code == status.HTTP_200_OK
user_data = register_response.json()
user_id = user_data["id"]
# Step 2: Login
login_data = {
"email": valid_user_data["email"],
"password": valid_user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
assert login_response.status_code == status.HTTP_200_OK
token_data = login_response.json()
access_token = token_data["access_token"]
# Step 3: Access protected endpoint
headers = {"Authorization": f"Bearer {access_token}"}
profile_response = client.get("/users/me", headers=headers)
assert profile_response.status_code == status.HTTP_200_OK
profile_data = profile_response.json()
assert profile_data["id"] == user_id
assert profile_data["email"] == valid_user_data["email"]
def test_token_refresh_flow(self, client, valid_user_data):
"""Test token refresh flow"""
# Register and login
client.post("/auth/register", json=valid_user_data)
login_data = {
"email": valid_user_data["email"],
"password": valid_user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
token_data = login_response.json()
refresh_token = token_data["refresh_token"]
# Refresh token
refresh_data = {"refresh_token": refresh_token}
refresh_response = client.post("/auth/refresh", json=refresh_data)
assert refresh_response.status_code == status.HTTP_200_OK
new_token_data = refresh_response.json()
assert "access_token" in new_token_data
assert "refresh_token" in new_token_data
# Verify new token works
headers = {"Authorization": f"Bearer {new_token_data['access_token']}"}
profile_response = client.get("/users/me", headers=headers)
assert profile_response.status_code == status.HTTP_200_OK
def test_logout_flow(self, client, valid_user_data):
"""Test logout flow"""
# Register and login
client.post("/auth/register", json=valid_user_data)
login_data = {
"email": valid_user_data["email"],
"password": valid_user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
token_data = login_response.json()
access_token = token_data["access_token"]
# Logout
headers = {"Authorization": f"Bearer {access_token}"}
logout_response = client.post("/auth/logout", headers=headers)
assert logout_response.status_code == status.HTTP_200_OK
# Verify token is invalidated (if logout invalidates tokens)
# This depends on implementation - some systems don't invalidate JWT tokens
# profile_response = client.get("/users/me", headers=headers)
# assert profile_response.status_code == status.HTTP_401_UNAUTHORIZED
# ================================================================
# ERROR HANDLING TESTS
# ================================================================
class TestErrorHandling:
"""Test error handling scenarios"""
@pytest.mark.asyncio
async def test_database_error_during_registration(self, test_db):
"""Test handling of database errors during registration"""
with patch.object(test_db, 'commit', side_effect=Exception("Database error")):
with pytest.raises(Exception):
await AuthService.create_user(
"test@bakery.es",
"TestPassword123",
"Test User",
test_db
)
@pytest.mark.asyncio
async def test_database_error_during_authentication(self, test_db):
"""Test handling of database errors during authentication"""
with patch.object(test_db, 'execute', side_effect=Exception("Database error")):
result = await AuthService.authenticate_user(
"test@bakery.es",
"TestPassword123",
test_db
)
assert result is None
def test_malformed_json_request(self, client):
"""Test handling of malformed JSON in requests"""
response = client.post(
"/auth/register",
data="invalid json",
headers={"Content-Type": "application/json"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_empty_request_body(self, client):
"""Test handling of empty request body"""
response = client.post("/auth/register", json={})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
# ================================================================
# SECURITY TESTS
# ================================================================
class TestSecurity:
"""Test security-related functionality"""
def test_password_requirements_enforced(self, client):
"""Test that password requirements are enforced"""
test_cases = [
("weak", "Too short"),
("nouppercasehere123", "No uppercase"),
("NOLOWERCASEHERE123", "No lowercase"),
("NoNumbersHere", "No numbers"),
]
for password, description in test_cases:
user_data = {
"email": f"test_{password}@bakery.es",
"password": password,
"full_name": "Test User"
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST, f"Failed for {description}"
def test_email_validation(self, client):
"""Test email validation"""
invalid_emails = [
"invalid",
"@bakery.es",
"test@",
"test..test@bakery.es",
"test@bakery",
]
for email in invalid_emails:
user_data = {
"email": email,
"password": "TestPassword123",
"full_name": "Test User"
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_sql_injection_prevention(self, client):
"""Test SQL injection prevention"""
malicious_email = "test@bakery.es'; DROP TABLE users; --"
user_data = {
"email": malicious_email,
"password": "TestPassword123",
"full_name": "Test User"
}
# Should handle gracefully without SQL injection
response = client.post("/auth/register", json=user_data)
# Depending on email validation, this might be 422 or 400
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
def test_password_not_logged(self, client, valid_user_data, caplog):
"""Test that passwords are not logged"""
with caplog.at_level("DEBUG"):
client.post("/auth/register", json=valid_user_data)
# Check that password is not in any log messages
for record in caplog.records:
assert valid_user_data["password"] not in record.getMessage()
# ================================================================
# RATE LIMITING TESTS (if implemented)
# ================================================================
class TestRateLimiting:
"""Test rate limiting functionality"""
@pytest.mark.asyncio
async def test_login_rate_limiting(self, mock_redis):
"""Test login rate limiting"""
with patch('app.core.security.redis_client', mock_redis):
# Simulate multiple failed attempts
email = "test@bakery.es"
# First few attempts should be allowed
mock_redis.get.return_value = "3"
result = await SecurityManager.check_login_attempts(email)
assert result is True
# After max attempts, should be blocked
mock_redis.get.return_value = str(settings.MAX_LOGIN_ATTEMPTS)
result = await SecurityManager.check_login_attempts(email)
assert result is False
def test_multiple_registration_attempts(self, client, valid_user_data):
"""Test handling of multiple registration attempts"""
# This test depends on rate limiting implementation
# For now, just ensure system handles multiple requests gracefully
for i in range(3):
user_data = valid_user_data.copy()
user_data["email"] = f"test{i}@bakery.es"
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK
# ================================================================
# PERFORMANCE TESTS
# ================================================================
class TestPerformance:
"""Basic performance tests"""
def test_registration_performance(self, client):
"""Test registration performance with multiple users"""
import time
start_time = time.time()
for i in range(10):
user_data = {
"email": f"perftest{i}@bakery.es",
"password": "TestPassword123",
"full_name": f"Performance Test User {i}"
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK
end_time = time.time()
duration = end_time - start_time
# Should complete 10 logins in reasonable time
assert duration < 5.0, f"Login took too long: {duration}s"
# ================================================================
# EDGE CASES AND BOUNDARY TESTS
# ================================================================
class TestEdgeCases:
"""Test edge cases and boundary conditions"""
def test_very_long_email(self, client):
"""Test registration with very long email"""
long_email = "a" * 250 + "@bakery.es"
user_data = {
"email": long_email,
"password": "TestPassword123",
"full_name": "Test User"
}
response = client.post("/auth/register", json=user_data)
# Should handle gracefully (either accept if within limits or reject)
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
def test_very_long_full_name(self, client):
"""Test registration with very long full name"""
long_name = "A" * 300
user_data = {
"email": "test@bakery.es",
"password": "TestPassword123",
"full_name": long_name
}
response = client.post("/auth/register", json=user_data)
# Should handle gracefully
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
def test_very_long_password(self, client):
"""Test registration with very long password"""
long_password = "A1" + "a" * 1000 # Starts with valid pattern
user_data = {
"email": "test@bakery.es",
"password": long_password,
"full_name": "Test User"
}
response = client.post("/auth/register", json=user_data)
# Should handle gracefully
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST]
def test_unicode_characters_in_name(self, client):
"""Test registration with unicode characters in name"""
user_data = {
"email": "test@bakery.es",
"password": "TestPassword123",
"full_name": "José María García-Hernández 测试用户 🥖"
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["full_name"] == user_data["full_name"]
def test_special_characters_in_email(self, client):
"""Test registration with special characters in email"""
# These are valid email characters
special_emails = [
"test+tag@bakery.es",
"test.dot@bakery.es",
"test_underscore@bakery.es",
"test-dash@bakery.es"
]
for email in special_emails:
user_data = {
"email": email,
"password": "TestPassword123",
"full_name": "Test User"
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_200_OK, f"Failed for email: {email}"
def test_empty_strings(self, client):
"""Test registration with empty strings"""
user_data = {
"email": "",
"password": "",
"full_name": ""
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_null_values(self, client):
"""Test registration with null values"""
user_data = {
"email": None,
"password": None,
"full_name": None
}
response = client.post("/auth/register", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_whitespace_only_fields(self, client):
"""Test registration with whitespace-only fields"""
user_data = {
"email": " ",
"password": " ",
"full_name": " "
}
response = client.post("/auth/register", json=user_data)
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
def test_case_sensitivity_email(self, client):
"""Test email case sensitivity"""
user_data1 = {
"email": "Test@Bakery.ES",
"password": "TestPassword123",
"full_name": "Test User 1"
}
user_data2 = {
"email": "test@bakery.es",
"password": "TestPassword123",
"full_name": "Test User 2"
}
# Register first user
response1 = client.post("/auth/register", json=user_data1)
assert response1.status_code == status.HTTP_200_OK
# Try to register second user with different case
response2 = client.post("/auth/register", json=user_data2)
# Depending on implementation, might be treated as same email
# Most systems normalize email case
if response2.status_code == status.HTTP_400_BAD_REQUEST:
assert "already registered" in response2.json()["detail"].lower()
# ================================================================
# CONCURRENT ACCESS TESTS
# ================================================================
class TestConcurrency:
"""Test concurrent access scenarios"""
@pytest.mark.asyncio
async def test_concurrent_registration_same_email(self, test_db):
"""Test concurrent registration with same email"""
import asyncio
email = "concurrent@bakery.es"
password = "TestPassword123"
full_name = "Concurrent User"
# Create multiple concurrent registration tasks
tasks = [
AuthService.create_user(email, password, f"{full_name} {i}", test_db)
for i in range(3)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Only one should succeed, others should fail
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
assert len(successes) == 1, "Only one registration should succeed"
assert len(failures) >= 2, "Other registrations should fail"
@pytest.mark.asyncio
async def test_concurrent_login_attempts(self, test_db, test_user):
"""Test concurrent login attempts for same user"""
import asyncio
email = test_user.email
password = "TestPassword123"
# Create multiple concurrent login tasks
tasks = [
AuthService.authenticate_user(email, password, test_db)
for _ in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed (assuming no rate limiting in this test)
successes = [r for r in results if r is not None and not isinstance(r, Exception)]
assert len(successes) >= 3, "Most login attempts should succeed"
# ================================================================
# DATA INTEGRITY TESTS
# ================================================================
class TestDataIntegrity:
"""Test data integrity and consistency"""
@pytest.mark.asyncio
async def test_user_data_integrity_after_creation(self, test_db):
"""Test that user data remains intact after creation"""
email = "integrity@bakery.es"
password = "TestPassword123"
full_name = "Integrity Test User"
user = await AuthService.create_user(email, password, full_name, test_db)
# Verify all fields are correctly set
assert user.email == email
assert user.full_name == full_name
assert user.is_active is True
assert user.is_verified is False
assert user.created_at is not None
assert isinstance(user.created_at, datetime)
assert user.created_at.tzinfo is not None # Should be timezone-aware
# Verify password is hashed and verifiable
assert user.hashed_password != password
assert SecurityManager.verify_password(password, user.hashed_password)
@pytest.mark.asyncio
async def test_last_login_update(self, test_db, test_user):
"""Test that last_login is updated on authentication"""
original_last_login = test_user.last_login
# Authenticate user
authenticated_user = await AuthService.authenticate_user(
test_user.email,
"TestPassword123",
test_db
)
assert authenticated_user is not None
assert authenticated_user.last_login is not None
if original_last_login:
assert authenticated_user.last_login > original_last_login
else:
assert authenticated_user.last_login is not None
@pytest.mark.asyncio
async def test_database_rollback_on_error(self, test_db):
"""Test that database is rolled back on errors"""
# Get initial user count
from sqlalchemy import select, func
result = await test_db.execute(select(func.count(User.id)))
initial_count = result.scalar()
# Try to create user with invalid data that should cause rollback
with patch.object(test_db, 'commit', side_effect=Exception("Simulated error")):
try:
await AuthService.create_user(
"rollback@bakery.es",
"TestPassword123",
"Rollback Test User",
test_db
)
except Exception:
pass # Expected to fail
# Verify user count hasn't changed
result = await test_db.execute(select(func.count(User.id)))
final_count = result.scalar()
assert final_count == initial_count, "Database should be rolled back on error"
# ================================================================
# TOKEN MANAGEMENT TESTS
# ================================================================
class TestTokenManagement:
"""Test JWT token management"""
def test_token_expiration_validation(self):
"""Test that tokens expire correctly"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es"
}
# Create token with very short expiration
with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 0):
token = SecurityManager.create_access_token(user_data)
# Wait a moment and verify token is expired
import time
time.sleep(1)
decoded = SecurityManager.verify_token(token)
assert decoded is None, "Expired token should not be valid"
def test_token_contains_correct_claims(self):
"""Test that tokens contain correct claims"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es",
"full_name": "Test User"
}
token = SecurityManager.create_access_token(user_data)
decoded = SecurityManager.verify_token(token)
assert decoded is not None
assert decoded["user_id"] == user_data["user_id"]
assert decoded["email"] == user_data["email"]
assert decoded["full_name"] == user_data["full_name"]
assert "exp" in decoded # Expiration claim
assert "iat" in decoded # Issued at claim
def test_refresh_token_different_from_access_token(self):
"""Test that refresh tokens are different from access tokens"""
user_data = {
"user_id": str(uuid.uuid4()),
"email": "test@bakery.es"
}
access_token = SecurityManager.create_access_token(user_data)
refresh_token = SecurityManager.create_refresh_token(user_data)
assert access_token != refresh_token
# Both should be valid but have different expiration times
access_decoded = SecurityManager.verify_token(access_token)
refresh_decoded = SecurityManager.verify_token(refresh_token)
assert access_decoded is not None
assert refresh_decoded is not None
assert access_decoded["exp"] < refresh_decoded["exp"] # Refresh token expires later
# ================================================================
# COMPREHENSIVE INTEGRATION TESTS
# ================================================================
class TestCompleteAuthenticationFlows:
"""Comprehensive end-to-end authentication flow tests"""
def test_full_user_lifecycle(self, client):
"""Test complete user lifecycle: register -> login -> use -> logout"""
user_data = {
"email": "lifecycle@bakery.es",
"password": "TestPassword123",
"full_name": "Lifecycle Test User"
}
# 1. Registration
register_response = client.post("/auth/register", json=user_data)
assert register_response.status_code == status.HTTP_200_OK
user_info = register_response.json()
assert user_info["email"] == user_data["email"]
assert user_info["is_active"] is True
# 2. Login
login_data = {
"email": user_data["email"],
"password": user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
assert login_response.status_code == status.HTTP_200_OK
token_info = login_response.json()
access_token = token_info["access_token"]
refresh_token = token_info["refresh_token"]
# 3. Use access token to access protected endpoint
headers = {"Authorization": f"Bearer {access_token}"}
profile_response = client.get("/users/me", headers=headers)
assert profile_response.status_code == status.HTTP_200_OK
profile_data = profile_response.json()
assert profile_data["email"] == user_data["email"]
# 4. Refresh token
refresh_data = {"refresh_token": refresh_token}
refresh_response = client.post("/auth/refresh", json=refresh_data)
assert refresh_response.status_code == status.HTTP_200_OK
new_token_info = refresh_response.json()
new_access_token = new_token_info["access_token"]
# 5. Use new access token
new_headers = {"Authorization": f"Bearer {new_access_token}"}
new_profile_response = client.get("/users/me", headers=new_headers)
assert new_profile_response.status_code == status.HTTP_200_OK
# 6. Logout
logout_response = client.post("/auth/logout", headers=new_headers)
assert logout_response.status_code == status.HTTP_200_OK
def test_multiple_sessions_same_user(self, client, valid_user_data):
"""Test multiple simultaneous sessions for same user"""
# Register user
client.post("/auth/register", json=valid_user_data)
login_data = {
"email": valid_user_data["email"],
"password": valid_user_data["password"]
}
# Create multiple sessions
session1_response = client.post("/auth/login", json=login_data)
session2_response = client.post("/auth/login", json=login_data)
assert session1_response.status_code == status.HTTP_200_OK
assert session2_response.status_code == status.HTTP_200_OK
session1_token = session1_response.json()["access_token"]
session2_token = session2_response.json()["access_token"]
# Both tokens should work
headers1 = {"Authorization": f"Bearer {session1_token}"}
headers2 = {"Authorization": f"Bearer {session2_token}"}
response1 = client.get("/users/me", headers=headers1)
response2 = client.get("/users/me", headers=headers2)
assert response1.status_code == status.HTTP_200_OK
assert response2.status_code == status.HTTP_200_OK
# ================================================================
# CONFIGURATION TESTS
# ================================================================
class TestConfiguration:
"""Test configuration and environment-dependent behavior"""
def test_password_requirements_configurable(self):
"""Test that password requirements respect configuration"""
# Test with different configurations
with patch('app.core.config.settings.PASSWORD_MIN_LENGTH', 12):
assert not SecurityManager.validate_password("Short1") # 6 chars
assert SecurityManager.validate_password("LongerPassword123") # 17 chars
with patch('app.core.config.settings.PASSWORD_REQUIRE_SYMBOLS', True):
assert not SecurityManager.validate_password("NoSymbols123")
assert SecurityManager.validate_password("WithSymbol123!")
def test_jwt_expiration_configurable(self):
"""Test that JWT expiration respects configuration"""
user_data = {"user_id": str(uuid.uuid4()), "email": "test@bakery.es"}
with patch('app.core.config.settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 60):
token = SecurityManager.create_access_token(user_data)
decoded = SecurityManager.verify_token(token)
assert decoded is not None
# Check that expiration is approximately 60 minutes from now
exp_time = datetime.fromtimestamp(decoded['exp'])
now = datetime.utcnow()
diff = exp_time - now
# Should be close to 60 minutes (allow 1 minute tolerance)
assert 59 * 60 <= diff.total_seconds() <= 61 * 60
# ================================================================
# TEST RUNNER AND HELPERS
# ================================================================
def run_auth_tests():
"""Convenience function to run all auth tests"""
import subprocess
import sys
# Run pytest with coverage
cmd = [
sys.executable, "-m", "pytest",
__file__,
"-v",
"--tb=short",
"--cov=app",
"--cov-report=term-missing",
"--cov-report=html:htmlcov",
"-x" # Stop on first failure
]
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
print("STDERR:", result.stderr)
return result.returncode == 0
if __name__ == "__main__":
# Run tests if script is executed directly
import sys
success = run_auth_tests()
sys.exit(0 if success else 1)
# ================================================================
# ADDITIONAL TEST UTILITIES
# ================================================================
class AuthTestUtils:
"""Utility functions for authentication testing"""
@staticmethod
def create_test_user_data(email_suffix="", **overrides):
"""Create valid test user data with optional customization"""
default_data = {
"email": f"test{email_suffix}@bakery.es",
"password": "TestPassword123",
"full_name": f"Test User{email_suffix}"
}
default_data.update(overrides)
return default_data
@staticmethod
def extract_token_claims(token):
"""Extract claims from JWT token without verification"""
import base64
import json
# Split token and decode payload
parts = token.split('.')
if len(parts) != 3:
return None
# Add padding if needed
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += '=' * padding
try:
decoded_bytes = base64.urlsafe_b64decode(payload)
return json.loads(decoded_bytes)
except Exception:
return None
@staticmethod
async def create_authenticated_user(client, user_data=None):
"""Create and authenticate a user, return user info and tokens"""
if user_data is None:
user_data = AuthTestUtils.create_test_user_data()
# Register
register_response = client.post("/auth/register", json=user_data)
assert register_response.status_code == status.HTTP_200_OK
# Login
login_data = {
"email": user_data["email"],
"password": user_data["password"]
}
login_response = client.post("/auth/login", json=login_data)
assert login_response.status_code == status.HTTP_200_OK
return {
"user": register_response.json(),
"tokens": login_response.json(),
"headers": {"Authorization": f"Bearer {login_response.json()['access_token']}"}
}
# ================================================================
# PYTEST CONFIGURATION
# ================================================================
# pytest configuration for the test file
pytest_plugins = ["pytest_asyncio"]
# Markers for different test categories
pytestmark = [
pytest.mark.asyncio,
pytest.mark.auth,
]
# Test configuration
def pytest_configure(config):
"""Configure pytest markers"""
config.addinivalue_line(
"markers", "auth: marks tests as authentication tests"
)
config.addinivalue_line(
"markers", "integration: marks tests as integration tests"
)
config.addinivalue_line(
"markers", "performance: marks tests as performance tests"
)
config.addinivalue_line(
"markers", "security: marks tests as security tests"
)