Files
bakery-ia/services/auth/app/core/security.py
Urtzi Alfaro 4073222888 Fix imports
2025-07-18 14:41:39 +02:00

156 lines
5.6 KiB
Python

# ================================================================
# services/auth/app/core/security.py (COMPLETE VERSION)
# ================================================================
"""
Security utilities for authentication service
"""
import bcrypt
import re
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import redis.asyncio as redis
from fastapi import HTTPException, status
import structlog
from app.core.config import settings
from shared.auth.jwt_handler import JWTHandler
logger = structlog.get_logger()
# Initialize JWT handler
jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM)
# Redis client for session management
redis_client = redis.from_url(settings.REDIS_URL)
class SecurityManager:
"""Security utilities for authentication"""
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt(rounds=settings.BCRYPT_ROUNDS)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
@staticmethod
def validate_password(password: str) -> bool:
"""Validate password strength"""
if len(password) < settings.PASSWORD_MIN_LENGTH:
return False
if settings.PASSWORD_REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password):
return False
if settings.PASSWORD_REQUIRE_LOWERCASE and not re.search(r'[a-z]', password):
return False
if settings.PASSWORD_REQUIRE_NUMBERS and not re.search(r'\d', password):
return False
if settings.PASSWORD_REQUIRE_SYMBOLS and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@staticmethod
def create_access_token(user_data: Dict[str, Any]) -> str:
"""Create JWT access token"""
expires_delta = timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt_handler.create_access_token(user_data, expires_delta)
@staticmethod
def create_refresh_token(user_data: Dict[str, Any]) -> str:
"""Create JWT refresh token"""
expires_delta = timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
return jwt_handler.create_refresh_token(user_data, expires_delta)
@staticmethod
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token"""
return jwt_handler.verify_token(token)
@staticmethod
def hash_token(token: str) -> str:
"""Hash token for storage"""
return hashlib.sha256(token.encode()).hexdigest()
@staticmethod
async def check_login_attempts(email: str) -> bool:
"""Check if user has exceeded login attempts"""
try:
key = f"login_attempts:{email}"
attempts = await redis_client.get(key)
if attempts is None:
return True
return int(attempts) < settings.MAX_LOGIN_ATTEMPTS
except Exception as e:
logger.error(f"Error checking login attempts: {e}")
return True # Allow on error
@staticmethod
async def increment_login_attempts(email: str) -> None:
"""Increment login attempts for email"""
try:
key = f"login_attempts:{email}"
await redis_client.incr(key)
await redis_client.expire(key, settings.LOCKOUT_DURATION_MINUTES * 60)
except Exception as e:
logger.error(f"Error incrementing login attempts: {e}")
@staticmethod
async def clear_login_attempts(email: str) -> None:
"""Clear login attempts for email"""
try:
key = f"login_attempts:{email}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error clearing login attempts: {e}")
@staticmethod
async def store_refresh_token(user_id: str, token: str) -> None:
"""Store refresh token in Redis"""
try:
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
# Store for the duration of the refresh token
expire_seconds = settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
await redis_client.setex(key, expire_seconds, "valid")
except Exception as e:
logger.error(f"Error storing refresh token: {e}")
@staticmethod
async def verify_refresh_token(user_id: str, token: str) -> bool:
"""Verify refresh token exists in Redis"""
try:
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
result = await redis_client.get(key)
return result is not None
except Exception as e:
logger.error(f"Error verifying refresh token: {e}")
return False
@staticmethod
async def revoke_refresh_token(user_id: str, token: str) -> None:
"""Revoke refresh token"""
try:
token_hash = SecurityManager.hash_token(token)
key = f"refresh_token:{user_id}:{token_hash}"
await redis_client.delete(key)
except Exception as e:
logger.error(f"Error revoking refresh token: {e}")
# Create singleton instance
security_manager = SecurityManager()