""" Unified authentication decorators for microservices Designed to work with gateway authentication middleware """ from functools import wraps from fastapi import HTTPException, status, Request, Depends from fastapi.security import HTTPBearer from typing import Callable, Optional, Dict, Any import structlog logger = structlog.get_logger() # Bearer token scheme for services that need it security = HTTPBearer(auto_error=False) def require_authentication(func: Callable) -> Callable: """ Decorator to require authentication - trusts gateway validation Services behind the gateway should use this decorator """ @wraps(func) async def wrapper(*args, **kwargs): # Find request object in arguments request = None for arg in args: if isinstance(arg, Request): request = arg break if not request: request = kwargs.get('request') if not request: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Request object not found" ) # Check if user context exists (set by gateway) if not hasattr(request.state, 'user') or not request.state.user: # Check headers as fallback (for direct service calls in dev) user_info = extract_user_from_headers(request) if not user_info: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) request.state.user = user_info return await func(*args, **kwargs) return wrapper def require_tenant_access(func: Callable) -> Callable: """Decorator to require tenant access""" @wraps(func) async def wrapper(*args, **kwargs): request = None for arg in args: if isinstance(arg, Request): request = arg break if not request: request = kwargs.get('request') if not request or not hasattr(request.state, 'tenant_id'): # Try to extract from headers tenant_id = extract_tenant_from_headers(request) if not tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Tenant access required" ) request.state.tenant_id = tenant_id return await func(*args, **kwargs) return wrapper def require_role(role: str): """Decorator to require specific role""" def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): request = None for arg in args: if isinstance(arg, Request): request = arg break if not request: request = kwargs.get('request') user = get_current_user(request) user_role = user.get('role', '').lower() if user_role != role.lower() and user_role != 'admin': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"{role} role required" ) return await func(*args, **kwargs) return wrapper return decorator def get_current_user(request: Request) -> Dict[str, Any]: """Get current user from request state or headers""" if hasattr(request.state, 'user') and request.state.user: return request.state.user # Fallback to headers (for dev/testing) user_info = extract_user_from_headers(request) if not user_info: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated" ) return user_info def get_current_tenant_id(request: Request) -> Optional[str]: """Get current tenant ID from request state or headers""" if hasattr(request.state, 'tenant_id'): return request.state.tenant_id # Fallback to headers return extract_tenant_from_headers(request) def extract_user_from_headers(request: Request) -> Optional[Dict[str, Any]]: """Extract user information from forwarded headers (gateway sets these)""" user_id = request.headers.get("X-User-ID") if not user_id: return None return { "user_id": user_id, "email": request.headers.get("X-User-Email", ""), "role": request.headers.get("X-User-Role", "user"), "tenant_id": request.headers.get("X-Tenant-ID"), "permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else [] } def extract_tenant_from_headers(request: Request) -> Optional[str]: """Extract tenant ID from headers""" return request.headers.get("X-Tenant-ID") # FastAPI Dependencies for injection async def get_current_user_dep(request: Request) -> Dict[str, Any]: """FastAPI dependency to get current user""" return get_current_user(request) async def get_current_tenant_id_dep(request: Request) -> Optional[str]: """FastAPI dependency to get current tenant ID""" return get_current_tenant_id(request) # Export all decorators and functions __all__ = [ 'require_authentication', 'require_tenant_access', 'require_role', 'get_current_user', 'get_current_tenant_id', 'get_current_user_dep', 'get_current_tenant_id_dep', 'extract_user_from_headers', 'extract_tenant_from_headers' ]