Files
bakery-ia/shared/auth/jwt_handler.py

275 lines
9.9 KiB
Python
Raw Normal View History

2025-07-26 20:04:24 +02:00
# shared/auth/jwt_handler.py
"""
2025-07-26 20:04:24 +02:00
Enhanced JWT Handler with proper token structure and validation
FIXED VERSION - Consistent token format between all services
"""
2025-07-19 17:49:03 +02:00
from jose import jwt, JWTError
2025-07-18 07:46:56 +02:00
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
2025-07-19 17:49:03 +02:00
import structlog
2025-07-19 17:49:03 +02:00
logger = structlog.get_logger()
class JWTHandler:
2025-07-26 20:04:24 +02:00
"""Enhanced JWT token handling with consistent format"""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
2025-07-26 23:29:57 +02:00
def create_access_token_from_payload(self, payload: Dict[str, Any]) -> str:
"""
Create JWT ACCESS token from complete payload
FIXED: Only creates access tokens with access token structure
"""
try:
# Ensure this is marked as an access token
payload["type"] = "access"
encoded_jwt = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.debug(f"Created access token with payload keys: {list(payload.keys())}")
return encoded_jwt
except Exception as e:
logger.error(f"Access token creation failed: {e}")
raise ValueError(f"Failed to encode access token: {str(e)}")
def create_refresh_token_from_payload(self, payload: Dict[str, Any]) -> str:
"""
Create JWT REFRESH token from complete payload
FIXED: Only creates refresh tokens with refresh token structure
"""
try:
# Ensure this is marked as a refresh token
payload["type"] = "refresh"
encoded_jwt = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.debug(f"Created refresh token with payload keys: {list(payload.keys())}")
return encoded_jwt
except Exception as e:
logger.error(f"Refresh token creation failed: {e}")
raise ValueError(f"Failed to encode refresh token: {str(e)}")
2025-07-19 17:49:03 +02:00
def create_access_token(self, user_data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
2025-07-26 23:29:57 +02:00
Create JWT access token with STANDARD structure (legacy method)
FIXED: Consistent payload format for access tokens
2025-07-19 17:49:03 +02:00
"""
to_encode = {
2025-07-26 23:29:57 +02:00
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"email": user_data["email"],
"type": "access"
2025-07-19 17:49:03 +02:00
}
2025-07-26 20:04:24 +02:00
# Add optional fields if present
if "full_name" in user_data:
to_encode["full_name"] = user_data["full_name"]
if "is_verified" in user_data:
to_encode["is_verified"] = user_data["is_verified"]
2025-07-26 23:29:57 +02:00
if "is_active" in user_data:
to_encode["is_active"] = user_data["is_active"]
2025-07-26 20:04:24 +02:00
# Set expiration
if expires_delta:
2025-07-19 17:49:03 +02:00
expire = datetime.now(timezone.utc) + expires_delta
else:
2025-07-19 17:49:03 +02:00
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
2025-07-19 17:49:03 +02:00
to_encode.update({
"exp": expire,
2025-07-26 23:29:57 +02:00
"iat": datetime.now(timezone.utc),
"iss": "bakery-auth"
2025-07-19 17:49:03 +02:00
})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
2025-07-19 17:49:03 +02:00
logger.debug(f"Created access token for user {user_data['email']}")
return encoded_jwt
2025-07-19 17:49:03 +02:00
def create_refresh_token(self, user_data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
2025-07-26 20:04:24 +02:00
"""
2025-07-26 23:29:57 +02:00
Create JWT refresh token with MINIMAL payload (legacy method)
FIXED: Consistent refresh token structure, different from access
2025-07-26 20:04:24 +02:00
"""
2025-07-19 17:49:03 +02:00
to_encode = {
"sub": user_data["user_id"],
"user_id": user_data["user_id"],
"type": "refresh"
}
2025-07-26 23:29:57 +02:00
# Add unique identifier to prevent duplicates
if "jti" in user_data:
to_encode["jti"] = user_data["jti"]
else:
import uuid
to_encode["jti"] = str(uuid.uuid4())
# Include email only if available (optional for refresh tokens)
if "email" in user_data and user_data["email"]:
to_encode["email"] = user_data["email"]
# Set expiration
if expires_delta:
2025-07-19 17:49:03 +02:00
expire = datetime.now(timezone.utc) + expires_delta
else:
2025-07-26 23:29:57 +02:00
expire = datetime.now(timezone.utc) + timedelta(days=30)
2025-07-19 17:49:03 +02:00
to_encode.update({
"exp": expire,
2025-07-26 23:29:57 +02:00
"iat": datetime.now(timezone.utc),
"iss": "bakery-auth"
2025-07-19 17:49:03 +02:00
})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
2025-07-26 20:04:24 +02:00
logger.debug(f"Created refresh token for user {user_data['user_id']}")
return encoded_jwt
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
2025-07-26 20:04:24 +02:00
"""
2025-07-26 23:29:57 +02:00
Verify and decode JWT token
2025-07-26 20:04:24 +02:00
"""
try:
2025-07-26 23:29:57 +02:00
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
2025-07-19 17:49:03 +02:00
2025-07-26 23:29:57 +02:00
# Check if token is expired
exp_timestamp = payload.get("exp")
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
if datetime.now(timezone.utc) > exp_datetime:
logger.debug("Token is expired")
return None
2025-07-19 17:49:03 +02:00
2025-07-26 23:29:57 +02:00
logger.debug(f"Token verified successfully, type: {payload.get('type', 'unknown')}")
return payload
2025-07-19 17:49:03 +02:00
2025-07-26 23:29:57 +02:00
except JWTError as e:
logger.warning(f"JWT verification failed: {e}")
2025-07-26 20:04:24 +02:00
return None
except Exception as e:
2025-07-26 23:29:57 +02:00
logger.error(f"Token verification error: {e}")
return None
2025-07-26 20:04:24 +02:00
2025-07-26 23:29:57 +02:00
def decode_token_no_verify(self, token: str) -> Dict[str, Any]:
2025-07-26 20:04:24 +02:00
"""
2025-07-26 23:29:57 +02:00
Decode JWT token without verification (for inspection purposes)
2025-07-26 20:04:24 +02:00
"""
try:
2025-09-17 16:06:30 +02:00
# Decode without verification - need to provide key but disable verification
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm], options={"verify_signature": False})
2025-07-26 23:29:57 +02:00
return payload
2025-07-19 17:49:03 +02:00
except Exception as e:
2025-07-26 23:29:57 +02:00
logger.error(f"Token decoding failed: {e}")
raise ValueError("Invalid token format")
def get_token_type(self, token: str) -> Optional[str]:
"""
Get the type of token (access or refresh) without full verification
"""
try:
payload = self.decode_token_no_verify(token)
return payload.get("type")
except Exception:
2025-07-26 20:04:24 +02:00
return None
2025-07-26 23:29:57 +02:00
def is_token_expired(self, token: str) -> bool:
2025-07-26 20:04:24 +02:00
"""
2025-07-26 23:29:57 +02:00
Check if token is expired without full verification
2025-07-26 20:04:24 +02:00
"""
2025-07-26 23:29:57 +02:00
try:
payload = self.decode_token_no_verify(token)
exp_timestamp = payload.get("exp")
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
return datetime.now(timezone.utc) > exp_datetime
return True
except Exception:
return True
2025-07-26 20:04:24 +02:00
def extract_user_id(self, token: str) -> Optional[str]:
"""
Extract user ID from token without full verification
Useful for quick user identification
"""
try:
2025-09-17 16:06:30 +02:00
payload = self.decode_token_no_verify(token)
2025-07-26 20:04:24 +02:00
if payload:
return payload.get("user_id")
except Exception as e:
logger.warning(f"Failed to extract user ID from token: {e}")
2025-09-17 16:06:30 +02:00
2025-07-26 20:04:24 +02:00
return None
2025-10-31 11:54:19 +01:00
def create_service_token(self, service_name: str, expires_delta: Optional[timedelta] = None) -> str:
"""
Create JWT token for service-to-service communication
Args:
service_name: Name of the service (e.g., 'auth-service', 'tenant-service')
expires_delta: Optional expiration time (defaults to 365 days for services)
Returns:
Encoded JWT service token
"""
to_encode = {
"sub": service_name,
"user_id": service_name,
"service": service_name,
"type": "service",
"is_service": True,
"role": "admin", # Services have admin privileges
"email": f"{service_name}@internal.service"
}
# Set expiration (default to 1 year for service tokens)
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(days=365)
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"iss": "bakery-auth"
})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
logger.info(f"Created service token for {service_name}")
return encoded_jwt
2025-07-26 20:04:24 +02:00
def get_token_info(self, token: str) -> Dict[str, Any]:
"""
Get comprehensive token information for debugging
"""
info = {
"valid": False,
"expired": True,
"user_id": None,
"email": None,
"type": None,
"exp": None,
"iat": None
}
2025-10-31 11:54:19 +01:00
2025-07-26 20:04:24 +02:00
try:
# Try unsafe decode first
2025-09-17 16:06:30 +02:00
payload = self.decode_token_no_verify(token)
2025-07-26 20:04:24 +02:00
if payload:
info.update({
"user_id": payload.get("user_id"),
"email": payload.get("email"),
"type": payload.get("type"),
"exp": payload.get("exp"),
"iat": payload.get("iat"),
"expired": self.is_token_expired(token)
})
2025-10-31 11:54:19 +01:00
2025-07-26 20:04:24 +02:00
# Try full verification
verified_payload = self.verify_token(token)
info["valid"] = verified_payload is not None
2025-10-31 11:54:19 +01:00
2025-07-26 20:04:24 +02:00
except Exception as e:
logger.warning(f"Failed to get token info: {e}")
2025-10-31 11:54:19 +01:00
2025-07-26 20:04:24 +02:00
return info