Files
bakery-ia/services/auth/app/core/security.py

156 lines
5.6 KiB
Python
Raw Normal View History

2025-07-17 21:25:27 +02:00
# ================================================================
# services/auth/app/core/security.py (COMPLETE VERSION)
# ================================================================
"""
Security utilities for authentication service
"""
import bcrypt
import re
2025-07-17 21:25:27 +02:00
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import redis.asyncio as redis
from fastapi import HTTPException, status
import logging
from app.core.config import settings
from shared.auth.jwt_handler import JWTHandler
logger = logging.getLogger(__name__)
# Initialize JWT handler
jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM)
# Redis client for session management
redis_client = redis.from_url(settings.REDIS_URL)
class SecurityManager:
"""Security utilities for authentication"""
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt(rounds=settings.BCRYPT_ROUNDS)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
@staticmethod
def validate_password(password: str) -> bool:
"""Validate password strength"""
if len(password) < settings.PASSWORD_MIN_LENGTH:
return False
if settings.PASSWORD_REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password):
return False
if settings.PASSWORD_REQUIRE_LOWERCASE and not re.search(r'[a-z]', password):
return False
if settings.PASSWORD_REQUIRE_NUMBERS and not re.search(r'\d', password):
return False
if settings.PASSWORD_REQUIRE_SYMBOLS and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@staticmethod
def create_access_token(user_data: Dict[str, Any]) -> str:
"""Create JWT access token"""
expires_delta = timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt_handler.create_access_token(user_data, expires_delta)
@staticmethod
def create_refresh_token(user_data: Dict[str, Any]) -> str:
"""Create JWT refresh token"""
expires_delta = timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
return jwt_handler.create_refresh_token(user_data, expires_delta)
@staticmethod
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token"""
return jwt_handler.verify_token(token)
2025-07-17 21:25:27 +02:00
@staticmethod
def hash_token(token: str) -> str:
"""Hash token for storage"""
return hashlib.sha256(token.encode()).hexdigest()
@staticmethod
async def check_login_attempts(email: str) -> bool:
"""Check if user has exceeded login attempts"""
try:
key = f"login_attempts:{email}"
attempts = await redis_client.get(key)
if attempts is None:
return True
return int(attempts) < settings.MAX_LOGIN_ATTEMPTS
except Exception as e:
logger.error(f"Error checking login attempts: {e}")
2025-07-17 21:25:27 +02:00
return True # Allow on error
@staticmethod
2025-07-17 21:25:27 +02:00
async def increment_login_attempts(email: str) -> None:
"""Increment login attempts for email"""
try:
key = f"login_attempts:{email}"
2025-07-17 21:25:27 +02:00
await redis_client.incr(key)
await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60)
except Exception as e:
logger.error(f"Error incrementing login attempts: {e}")
@staticmethod
2025-07-17 21:25:27 +02:00
async def clear_login_attempts(email: str) -> None:
"""Clear login attempts for email"""
try:
key = f"login_attempts:{email}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error clearing login attempts: {e}")
@staticmethod
2025-07-17 21:25:27 +02:00
async def store_refresh_token(user_id: str, token: str) -> None:
"""Store refresh token in Redis"""
try:
2025-07-17 21:25:27 +02:00
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
2025-07-17 21:25:27 +02:00
# Store for the duration of the refresh token
expire_seconds = settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
await redis_client.setex(key, expire_seconds, "valid")
except Exception as e:
logger.error(f"Error storing refresh token: {e}")
@staticmethod
2025-07-17 21:25:27 +02:00
async def verify_refresh_token(user_id: str, token: str) -> bool:
"""Verify refresh token exists in Redis"""
try:
2025-07-17 21:25:27 +02:00
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
2025-07-17 21:25:27 +02:00
result = await redis_client.get(key)
return result is not None
except Exception as e:
logger.error(f"Error verifying refresh token: {e}")
return False
@staticmethod
2025-07-17 21:25:27 +02:00
async def revoke_refresh_token(user_id: str, token: str) -> None:
"""Revoke refresh token"""
try:
2025-07-17 21:25:27 +02:00
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error revoking refresh token: {e}")
2025-07-17 21:25:27 +02:00
# Create singleton instance
security_manager = SecurityManager()