REFACTOR API gateway fix 6

This commit is contained in:
Urtzi Alfaro
2025-07-26 21:48:53 +02:00
parent 7d5c8bc9a4
commit de3bd5e541
5 changed files with 66 additions and 74 deletions

View File

@@ -1,10 +1,9 @@
# services/auth/app/core/security.py
# services/auth/app/core/security.py - FIXED VERSION
"""
Security utilities for authentication service
FIXED VERSION - Consistent JWT token structure
FIXED VERSION - Consistent password hashing using passlib
"""
import bcrypt
import re
import hashlib
from datetime import datetime, timedelta, timezone
@@ -12,12 +11,16 @@ from typing import Optional, Dict, Any
import redis.asyncio as redis
from fastapi import HTTPException, status
import structlog
from passlib.context import CryptContext
from app.core.config import settings
from shared.auth.jwt_handler import JWTHandler
logger = structlog.get_logger()
# ✅ FIX: Use passlib for consistent password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Initialize JWT handler with SAME configuration as gateway
jwt_handler = JWTHandler(settings.JWT_SECRET_KEY, settings.JWT_ALGORITHM)
@@ -29,14 +32,17 @@ class SecurityManager:
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt(rounds=settings.BCRYPT_ROUNDS)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
"""Hash password using passlib bcrypt - FIXED"""
return pwd_context.hash(password)
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
"""Verify password against hash using passlib - FIXED"""
try:
return pwd_context.verify(password, hashed_password)
except Exception as e:
logger.error(f"Password verification error: {e}")
return False
@staticmethod
def validate_password(password: str) -> bool:
@@ -59,49 +65,40 @@ class SecurityManager:
return True
@staticmethod
def create_access_token(user_data: Dict[str, Any]) -> str:
"""
Create JWT access token with CONSISTENT structure
FIXED: Uses shared JWT handler for consistency
"""
return jwt_handler.create_access_token(
user_data=user_data,
expires_delta=timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
)
def create_access_token(user_data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"email": user_data["email"],
"type": "access",
"full_name": user_data.get("full_name", ""),
"is_verified": user_data.get("is_verified", False),
"exp": expire,
"iat": datetime.now(timezone.utc)
}
return jwt_handler.create_access_token(payload)
@staticmethod
def create_refresh_token(user_data: Dict[str, Any]) -> str:
"""
Create JWT refresh token with CONSISTENT structure
FIXED: Uses shared JWT handler for consistency
"""
return jwt_handler.create_refresh_token(
user_data=user_data,
expires_delta=timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
)
@staticmethod
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify JWT token with CONSISTENT validation
FIXED: Uses shared JWT handler for consistency
"""
try:
return jwt_handler.verify_token(token)
except Exception as e:
logger.warning(f"Token verification failed: {e}")
return None
@staticmethod
def decode_token(token: str) -> Optional[Dict[str, Any]]:
"""
Decode JWT token without verification (for debugging)
"""
try:
return jwt_handler.decode_token_unsafe(token)
except Exception as e:
logger.warning(f"Token decode failed: {e}")
return None
"""Create JWT refresh token"""
expire = datetime.now(timezone.utc) + timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"type": "refresh",
"exp": expire,
"iat": datetime.now(timezone.utc)
}
return jwt_handler.create_access_token(payload)
@staticmethod
async def track_login_attempt(email: str, ip_address: str, success: bool) -> None: