Files
bakery-ia/services/auth/app/core/security.py

153 lines
5.2 KiB
Python
Raw Normal View History

"""
Security utilities for authentication service
"""
import bcrypt
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import redis.asyncio as redis
from fastapi import HTTPException, status
import logging
from app.core.config import settings
from shared.auth.jwt_handler import JWTHandler
logger = logging.getLogger(__name__)
# Initialize JWT handler
jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM)
# Redis client for session management
redis_client = redis.from_url(settings.REDIS_URL)
class SecurityManager:
"""Security utilities for authentication"""
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt(rounds=settings.BCRYPT_ROUNDS)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
@staticmethod
def validate_password(password: str) -> bool:
"""Validate password strength"""
if len(password) < settings.PASSWORD_MIN_LENGTH:
return False
if settings.PASSWORD_REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password):
return False
if settings.PASSWORD_REQUIRE_LOWERCASE and not re.search(r'[a-z]', password):
return False
if settings.PASSWORD_REQUIRE_NUMBERS and not re.search(r'\d', password):
return False
if settings.PASSWORD_REQUIRE_SYMBOLS and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@staticmethod
def create_access_token(user_data: Dict[str, Any]) -> str:
"""Create JWT access token"""
expires_delta = timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt_handler.create_access_token(user_data, expires_delta)
@staticmethod
def create_refresh_token(user_data: Dict[str, Any]) -> str:
"""Create JWT refresh token"""
expires_delta = timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
return jwt_handler.create_refresh_token(user_data, expires_delta)
@staticmethod
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token"""
return jwt_handler.verify_token(token)
@staticmethod
async def check_login_attempts(email: str) -> bool:
"""Check if user has exceeded login attempts"""
try:
key = f"login_attempts:{email}"
attempts = await redis_client.get(key)
if attempts is None:
return True
return int(attempts) < settings.MAX_LOGIN_ATTEMPTS
except Exception as e:
logger.error(f"Error checking login attempts: {e}")
return True
@staticmethod
async def increment_login_attempts(email: str):
"""Increment login attempts counter"""
try:
key = f"login_attempts:{email}"
current_attempts = await redis_client.incr(key)
# Set TTL on first attempt
if current_attempts == 1:
await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60)
except Exception as e:
logger.error(f"Error incrementing login attempts: {e}")
@staticmethod
async def clear_login_attempts(email: str):
"""Clear login attempts counter"""
try:
key = f"login_attempts:{email}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error clearing login attempts: {e}")
@staticmethod
async def store_refresh_token(user_id: str, refresh_token: str):
"""Store refresh token in Redis"""
try:
key = f"refresh_token:{user_id}"
expires_seconds = settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS * 24 * 3600
await redis_client.setex(key, expires_seconds, refresh_token)
except Exception as e:
logger.error(f"Error storing refresh token: {e}")
@staticmethod
async def verify_refresh_token(user_id: str, refresh_token: str) -> bool:
"""Verify refresh token"""
try:
key = f"refresh_token:{user_id}"
stored_token = await redis_client.get(key)
if stored_token is None:
return False
return stored_token.decode() == refresh_token
except Exception as e:
logger.error(f"Error verifying refresh token: {e}")
return False
@staticmethod
async def revoke_refresh_token(user_id: str):
"""Revoke refresh token"""
try:
key = f"refresh_token:{user_id}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error revoking refresh token: {e}")
# Global security manager instance
security_manager = SecurityManager()