2025-07-17 13:09:24 +02:00
|
|
|
"""
|
|
|
|
|
Shared JWT Authentication Handler
|
|
|
|
|
Used across all microservices for consistent authentication
|
|
|
|
|
"""
|
|
|
|
|
|
2025-07-17 14:34:24 +02:00
|
|
|
from jose import jwt
|
2025-07-18 07:46:56 +02:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2025-07-17 13:09:24 +02:00
|
|
|
from typing import Optional, Dict, Any
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
class JWTHandler:
|
|
|
|
|
"""JWT token handling for microservices"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, secret_key: str, algorithm: str = "HS256"):
|
|
|
|
|
self.secret_key = secret_key
|
|
|
|
|
self.algorithm = algorithm
|
|
|
|
|
|
|
|
|
|
def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
|
|
|
|
|
"""Create JWT access token"""
|
|
|
|
|
to_encode = data.copy()
|
|
|
|
|
|
|
|
|
|
if expires_delta:
|
2025-07-18 07:46:56 +02:00
|
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
2025-07-17 13:09:24 +02:00
|
|
|
else:
|
2025-07-18 07:46:56 +02:00
|
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
to_encode.update({"exp": expire, "type": "access"})
|
|
|
|
|
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
def create_refresh_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
|
|
|
|
|
"""Create JWT refresh token"""
|
|
|
|
|
to_encode = data.copy()
|
|
|
|
|
|
|
|
|
|
if expires_delta:
|
2025-07-18 07:46:56 +02:00
|
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
2025-07-17 13:09:24 +02:00
|
|
|
else:
|
2025-07-18 07:46:56 +02:00
|
|
|
expire = datetime.now(timezone.utc) + timedelta(days=7)
|
2025-07-17 13:09:24 +02:00
|
|
|
|
|
|
|
|
to_encode.update({"exp": expire, "type": "refresh"})
|
|
|
|
|
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Verify and decode JWT token"""
|
|
|
|
|
try:
|
|
|
|
|
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
|
|
|
|
|
return payload
|
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
|
|
|
logger.warning("Token has expired")
|
|
|
|
|
return None
|
2025-07-17 14:34:24 +02:00
|
|
|
except jwt.JWTError:
|
2025-07-17 13:09:24 +02:00
|
|
|
logger.warning("Invalid token")
|
|
|
|
|
return None
|