""" Shared JWT Authentication Handler Used across all microservices for consistent authentication """ import jwt from datetime import datetime, timedelta from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) class JWTHandler: """JWT token handling for microservices""" def __init__(self, secret_key: str, algorithm: str = "HS256"): self.secret_key = secret_key self.algorithm = algorithm def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=30) to_encode.update({"exp": expire, "type": "access"}) encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) return encoded_jwt def create_refresh_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Create JWT refresh token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=7) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) return encoded_jwt def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify and decode JWT token""" try: payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) return payload except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.InvalidTokenError: logger.warning("Invalid token") return None