Files
bakery-ia/services/auth/app/core/security.py
2025-07-26 22:03:55 +02:00

305 lines
12 KiB
Python

# services/auth/app/core/security.py - FIXED VERSION
"""
Security utilities for authentication service
FIXED VERSION - Consistent password hashing using passlib
"""
import re
import hashlib
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
import redis.asyncio as redis
from fastapi import HTTPException, status
import structlog
from passlib.context import CryptContext
from app.core.config import settings
from shared.auth.jwt_handler import JWTHandler
logger = structlog.get_logger()
# ✅ FIX: Use passlib for consistent password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Initialize JWT handler with SAME configuration as gateway
jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM)
# Redis client for session management
redis_client = redis.from_url(settings.REDIS_URL)
class SecurityManager:
"""Security utilities for authentication - FIXED VERSION"""
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using passlib bcrypt - FIXED"""
return pwd_context.hash(password)
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash using passlib - FIXED"""
try:
return pwd_context.verify(password, hashed_password)
except Exception as e:
logger.error(f"Password verification error: {e}")
return False
@staticmethod
def validate_password(password: str) -> bool:
"""Validate password strength"""
if len(password) < settings.PASSWORD_MIN_LENGTH:
return False
if settings.PASSWORD_REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password):
return False
if settings.PASSWORD_REQUIRE_LOWERCASE and not re.search(r'[a-z]', password):
return False
if settings.PASSWORD_REQUIRE_NUMBERS and not re.search(r'\d', password):
return False
if settings.PASSWORD_REQUIRE_SYMBOLS and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@staticmethod
def create_access_token(user_data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token with PROPER validation"""
# ✅ FIX 1: Validate required fields BEFORE token creation
required_fields = ["user_id", "email"]
missing_fields = [field for field in required_fields if field not in user_data]
if missing_fields:
error_msg = f"Missing required fields for token creation: {missing_fields}"
logger.error(f"Token creation failed: {error_msg}")
raise ValueError(error_msg)
# ✅ FIX 2: Validate that required fields are not None/empty
if not user_data.get("user_id"):
raise ValueError("user_id cannot be empty")
if not user_data.get("email"):
raise ValueError("email cannot be empty")
try:
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
# ✅ FIX 3: Build payload with SAFE access to user_data
payload = {
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"email": user_data["email"], # ✅ Guaranteed to exist now
"type": "access",
"full_name": user_data.get("full_name", ""), # Safe access with default
"is_verified": user_data.get("is_verified", False), # Safe access with default
"is_active": user_data.get("is_active", True), # Safe access with default
"exp": expire,
"iat": datetime.now(timezone.utc),
"iss": "bakery-auth" # Token issuer
}
logger.debug(f"Creating access token with payload keys: {list(payload.keys())}")
# ✅ FIX 4: Use jwt_handler with proper error handling
token = jwt_handler.create_access_token(payload)
logger.debug(f"Access token created successfully for user {user_data['email']}")
return token
except Exception as e:
logger.error(f"Access token creation failed for {user_data.get('email', 'unknown')}: {e}")
raise ValueError(f"Failed to create access token: {str(e)}")
@staticmethod
def create_refresh_token(user_data: Dict[str, Any]) -> str:
"""Create JWT refresh token with FLEXIBLE validation"""
# ✅ FIX 1: Validate only essential fields for refresh token
if "user_id" not in user_data:
error_msg = "user_id required for refresh token creation"
logger.error(f"Refresh token creation failed: {error_msg}")
raise ValueError(error_msg)
if not user_data.get("user_id"):
raise ValueError("user_id cannot be empty")
try:
expire = datetime.now(timezone.utc) + timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
# ✅ FIX 2: Minimal payload for refresh token (email is optional)
payload = {
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"type": "refresh",
"exp": expire,
"iat": datetime.now(timezone.utc),
"iss": "bakery-auth"
}
# ✅ FIX 3: Include email only if available (no longer required)
if "email" in user_data and user_data["email"]:
payload["email"] = user_data["email"]
logger.debug(f"Creating refresh token with payload keys: {list(payload.keys())}")
# Use the same JWT handler method (it handles both access and refresh)
token = jwt_handler.create_access_token(payload)
logger.debug(f"Refresh token created successfully for user {user_data['user_id']}")
return token
except Exception as e:
logger.error(f"Refresh token creation failed for {user_data.get('user_id', 'unknown')}: {e}")
raise ValueError(f"Failed to create refresh token: {str(e)}")
@staticmethod
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token with enhanced error handling"""
try:
payload = jwt_handler.verify_token(token)
if payload:
logger.debug(f"Token verified successfully for user: {payload.get('email', 'unknown')}")
return payload
except Exception as e:
logger.warning(f"Token verification failed: {e}")
return None
@staticmethod
async def track_login_attempt(email: str, ip_address: str, success: bool) -> None:
"""Track login attempts for security monitoring"""
try:
key = f"login_attempts:{email}:{ip_address}"
if success:
# Clear failed attempts on successful login
await redis_client.delete(key)
else:
# Increment failed attempts
attempts = await redis_client.incr(key)
if attempts == 1:
# Set expiration on first failed attempt
await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60)
if attempts >= settings.MAX_LOGIN_ATTEMPTS:
logger.warning(f"Account locked for {email} from {ip_address}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Too many failed login attempts. Try again in {settings.LOCKOUT_DURATION_MINUTES} minutes."
)
except Exception as e:
logger.error(f"Failed to track login attempt: {e}")
@staticmethod
async def is_account_locked(email: str, ip_address: str) -> bool:
"""Check if account is locked due to failed login attempts"""
try:
key = f"login_attempts:{email}:{ip_address}"
attempts = await redis_client.get(key)
if attempts:
attempts = int(attempts)
return attempts >= settings.MAX_LOGIN_ATTEMPTS
except Exception as e:
logger.error(f"Failed to check account lock status: {e}")
return False
@staticmethod
def hash_api_key(api_key: str) -> str:
"""Hash API key for storage"""
return hashlib.sha256(api_key.encode()).hexdigest()
@staticmethod
def generate_secure_token(length: int = 32) -> str:
"""Generate secure random token"""
import secrets
return secrets.token_urlsafe(length)
@staticmethod
def mask_sensitive_data(data: str, visible_chars: int = 4) -> str:
"""Mask sensitive data for logging"""
if not data or len(data) <= visible_chars:
return "*" * len(data) if data else ""
return data[:visible_chars] + "*" * (len(data) - visible_chars)
@staticmethod
async def check_login_attempts(email: str) -> bool:
"""Check if user has exceeded login attempts"""
try:
key = f"login_attempts:{email}"
attempts = await redis_client.get(key)
if attempts is None:
return True
return int(attempts) < settings.MAX_LOGIN_ATTEMPTS
except Exception as e:
logger.error(f"Error checking login attempts: {e}")
return True # Allow on error
@staticmethod
async def increment_login_attempts(email: str) -> None:
"""Increment login attempts for email"""
try:
key = f"login_attempts:{email}"
await redis_client.incr(key)
await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60)
except Exception as e:
logger.error(f"Error incrementing login attempts: {e}")
@staticmethod
async def clear_login_attempts(email: str) -> None:
"""Clear login attempts for email after successful login"""
try:
key = f"login_attempts:{email}"
await redis_client.delete(key)
logger.debug(f"Cleared login attempts for {email}")
except Exception as e:
logger.error(f"Error clearing login attempts: {e}")
@staticmethod
async def store_refresh_token(user_id: str, token: str) -> None:
"""Store refresh token in Redis"""
try:
token_hash = SecurityManager.hash_api_key(token) # Reuse hash method
key = f"refresh_token:{user_id}:{token_hash}"
# Store with expiration matching JWT refresh token expiry
expire_seconds = settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
await redis_client.setex(key, expire_seconds, "valid")
except Exception as e:
logger.error(f"Error storing refresh token: {e}")
@staticmethod
async def is_refresh_token_valid(user_id: str, token: str) -> bool:
"""Check if refresh token is still valid in Redis"""
try:
token_hash = SecurityManager.hash_api_key(token)
key = f"refresh_token:{user_id}:{token_hash}"
exists = await redis_client.exists(key)
return bool(exists)
except Exception as e:
logger.error(f"Error checking refresh token validity: {e}")
return False
@staticmethod
async def revoke_refresh_token(user_id: str, token: str) -> None:
"""Revoke refresh token by removing from Redis"""
try:
token_hash = SecurityManager.hash_api_key(token)
key = f"refresh_token:{user_id}:{token_hash}"
await redis_client.delete(key)
logger.debug(f"Revoked refresh token for user {user_id}")
except Exception as e:
logger.error(f"Error revoking refresh token: {e}")