# ================================================================ # services/auth/app/core/security.py (COMPLETE VERSION) # ================================================================ """ Security utilities for authentication service """ import bcrypt import re import hashlib from datetime import datetime, timedelta from typing import Optional, Dict, Any import redis.asyncio as redis from fastapi import HTTPException, status import structlog from app.core.config import settings from shared.auth.jwt_handler import JWTHandler logger = structlog.get_logger() # Initialize JWT handler jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM) # Redis client for session management redis_client = redis.from_url(settings.REDIS_URL) class SecurityManager: """Security utilities for authentication""" @staticmethod def hash_password(password: str) -> str: """Hash password using bcrypt""" salt = bcrypt.gensalt(rounds=settings.BCRYPT_ROUNDS) return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') @staticmethod def verify_password(password: str, hashed_password: str) -> bool: """Verify password against hash""" return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8')) @staticmethod def validate_password(password: str) -> bool: """Validate password strength""" if len(password) < settings.PASSWORD_MIN_LENGTH: return False if settings.PASSWORD_REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password): return False if settings.PASSWORD_REQUIRE_LOWERCASE and not re.search(r'[a-z]', password): return False if settings.PASSWORD_REQUIRE_NUMBERS and not re.search(r'\d', password): return False if settings.PASSWORD_REQUIRE_SYMBOLS and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): return False return True @staticmethod def create_access_token(user_data: Dict[str, Any]) -> str: """Create JWT access token""" expires_delta = timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) return jwt_handler.create_access_token(user_data, expires_delta) @staticmethod def create_refresh_token(user_data: Dict[str, Any]) -> str: """Create JWT refresh token""" expires_delta = timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS) return jwt_handler.create_refresh_token(user_data, expires_delta) @staticmethod def verify_token(token: str) -> Optional[Dict[str, Any]]: """Verify JWT token""" return jwt_handler.verify_token(token) @staticmethod def hash_token(token: str) -> str: """Hash token for storage""" return hashlib.sha256(token.encode()).hexdigest() @staticmethod async def check_login_attempts(email: str) -> bool: """Check if user has exceeded login attempts""" try: key = f"login_attempts:{email}" attempts = await redis_client.get(key) if attempts is None: return True return int(attempts) < settings.MAX_LOGIN_ATTEMPTS except Exception as e: logger.error(f"Error checking login attempts: {e}") return True # Allow on error @staticmethod async def increment_login_attempts(email: str) -> None: """Increment login attempts for email""" try: key = f"login_attempts:{email}" await redis_client.incr(key) await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60) except Exception as e: logger.error(f"Error incrementing login attempts: {e}") @staticmethod async def clear_login_attempts(email: str) -> None: """Clear login attempts for email""" try: key = f"login_attempts:{email}" await redis_client.delete(key) except Exception as e: logger.error(f"Error clearing login attempts: {e}") @staticmethod async def store_refresh_token(user_id: str, token: str) -> None: """Store refresh token in Redis""" try: token_hash = SecurityManager.hash_token(token) key = f"refresh_token:{user_id}:{token_hash}" # Store for the duration of the refresh token expire_seconds = settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60 await redis_client.setex(key, expire_seconds, "valid") except Exception as e: logger.error(f"Error storing refresh token: {e}") @staticmethod async def verify_refresh_token(user_id: str, token: str) -> bool: """Verify refresh token exists in Redis""" try: token_hash = SecurityManager.hash_token(token) key = f"refresh_token:{user_id}:{token_hash}" result = await redis_client.get(key) return result is not None except Exception as e: logger.error(f"Error verifying refresh token: {e}") return False @staticmethod async def revoke_refresh_token(user_id: str, token: str) -> None: """Revoke refresh token""" try: token_hash = SecurityManager.hash_token(token) key = f"refresh_token:{user_id}:{token_hash}" await redis_client.delete(key) except Exception as e: logger.error(f"Error revoking refresh token: {e}") # Create singleton instance security_manager = SecurityManager()