""" Shared JWT Authentication Handler Used across all microservices for consistent authentication """ from jose import jwt from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) class JWTHandler: """JWT token handling for microservices""" def __init__(self, secret_key: str, algorithm: str = "HS256"): self.secret_key = secret_key self.algorithm = algorithm def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=30) to_encode.update({"exp": expire, "type": "access"}) encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) return encoded_jwt def create_refresh_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Create JWT refresh token""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(days=7) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) return encoded_jwt def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify and decode JWT token""" try: payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) return payload except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.JWTError: logger.warning("Invalid token") return None