Add user role
This commit is contained in:
@@ -1,12 +1,16 @@
|
||||
# ================================================================
|
||||
# shared/auth/decorators.py - ENHANCED WITH ADMIN ROLE DECORATOR
|
||||
# ================================================================
|
||||
"""
|
||||
Unified authentication decorators for microservices
|
||||
Designed to work with gateway authentication middleware
|
||||
Enhanced authentication decorators for microservices including admin role validation.
|
||||
Designed to work with gateway authentication middleware and provide centralized
|
||||
role-based access control across all services.
|
||||
"""
|
||||
|
||||
from functools import wraps
|
||||
from fastapi import HTTPException, status, Request, Depends
|
||||
from fastapi.security import HTTPBearer
|
||||
from typing import Callable, Optional, Dict, Any
|
||||
from typing import Callable, Optional, Dict, Any, List
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
@@ -111,6 +115,208 @@ def require_role(role: str):
|
||||
|
||||
return decorator
|
||||
|
||||
def require_admin_role(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to require admin role - simplified version for FastAPI dependencies
|
||||
|
||||
This decorator ensures only users with 'admin' role can access the endpoint.
|
||||
Can be used as a FastAPI dependency or function decorator.
|
||||
|
||||
Usage as dependency:
|
||||
@router.delete("/admin/users/{user_id}")
|
||||
async def delete_user(
|
||||
user_id: str,
|
||||
current_user = Depends(get_current_user_dep),
|
||||
_admin_check = Depends(require_admin_role),
|
||||
):
|
||||
# Admin-only logic here
|
||||
|
||||
Usage as decorator:
|
||||
@require_admin_role
|
||||
@router.delete("/admin/users/{user_id}")
|
||||
async def delete_user(...):
|
||||
# Admin-only logic here
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
# Find request object in arguments
|
||||
request = None
|
||||
current_user = None
|
||||
|
||||
# Extract request and current_user from arguments
|
||||
for arg in args:
|
||||
if isinstance(arg, Request):
|
||||
request = arg
|
||||
elif isinstance(arg, dict) and 'user_id' in arg:
|
||||
current_user = arg
|
||||
|
||||
# Check kwargs for request and current_user
|
||||
if not request:
|
||||
request = kwargs.get('request')
|
||||
if not current_user:
|
||||
current_user = kwargs.get('current_user')
|
||||
|
||||
# If we still don't have current_user, try to get it from request
|
||||
if not current_user and request:
|
||||
current_user = get_current_user(request)
|
||||
|
||||
if not current_user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required"
|
||||
)
|
||||
|
||||
# Check if user has admin role
|
||||
user_role = current_user.get('role', '').lower()
|
||||
|
||||
if user_role != 'admin':
|
||||
logger.warning("Non-admin user attempted admin operation",
|
||||
user_id=current_user.get('user_id'),
|
||||
role=user_role)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Admin role required"
|
||||
)
|
||||
|
||||
logger.info("Admin operation authorized",
|
||||
user_id=current_user.get('user_id'),
|
||||
endpoint=func.__name__)
|
||||
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def require_roles(allowed_roles: List[str]):
|
||||
"""
|
||||
Decorator to require one of multiple roles
|
||||
|
||||
Args:
|
||||
allowed_roles: List of roles that are allowed to access the endpoint
|
||||
|
||||
Usage:
|
||||
@require_roles(['admin', 'manager'])
|
||||
@router.post("/sensitive-operation")
|
||||
async def sensitive_operation(...):
|
||||
# Only admins and managers can access
|
||||
"""
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
request = None
|
||||
current_user = None
|
||||
|
||||
# Extract request and current_user from arguments
|
||||
for arg in args:
|
||||
if isinstance(arg, Request):
|
||||
request = arg
|
||||
elif isinstance(arg, dict) and 'user_id' in arg:
|
||||
current_user = arg
|
||||
|
||||
# Check kwargs
|
||||
if not request:
|
||||
request = kwargs.get('request')
|
||||
if not current_user:
|
||||
current_user = kwargs.get('current_user')
|
||||
|
||||
# Get user from request if not provided
|
||||
if not current_user and request:
|
||||
current_user = get_current_user(request)
|
||||
|
||||
if not current_user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required"
|
||||
)
|
||||
|
||||
# Check if user has one of the allowed roles
|
||||
user_role = current_user.get('role', '').lower()
|
||||
allowed_roles_lower = [role.lower() for role in allowed_roles]
|
||||
|
||||
if user_role not in allowed_roles_lower:
|
||||
logger.warning("Unauthorized role attempted restricted operation",
|
||||
user_id=current_user.get('user_id'),
|
||||
user_role=user_role,
|
||||
allowed_roles=allowed_roles)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"One of these roles required: {', '.join(allowed_roles)}"
|
||||
)
|
||||
|
||||
logger.info("Role-based operation authorized",
|
||||
user_id=current_user.get('user_id'),
|
||||
user_role=user_role,
|
||||
endpoint=func.__name__)
|
||||
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
def require_tenant_admin(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to require admin role within a specific tenant context
|
||||
|
||||
This checks that the user is an admin AND has access to the tenant
|
||||
being operated on. Useful for tenant-scoped admin operations.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
request = None
|
||||
current_user = None
|
||||
|
||||
# Extract request and current_user from arguments
|
||||
for arg in args:
|
||||
if isinstance(arg, Request):
|
||||
request = arg
|
||||
elif isinstance(arg, dict) and 'user_id' in arg:
|
||||
current_user = arg
|
||||
|
||||
if not request:
|
||||
request = kwargs.get('request')
|
||||
if not current_user:
|
||||
current_user = kwargs.get('current_user')
|
||||
|
||||
if not current_user and request:
|
||||
current_user = get_current_user(request)
|
||||
|
||||
if not current_user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required"
|
||||
)
|
||||
|
||||
# Check admin role first
|
||||
user_role = current_user.get('role', '').lower()
|
||||
if user_role != 'admin':
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Admin role required"
|
||||
)
|
||||
|
||||
# Check tenant access
|
||||
tenant_id = get_current_tenant_id(request) if request else None
|
||||
if not tenant_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Tenant context required"
|
||||
)
|
||||
|
||||
# Additional tenant admin validation could go here
|
||||
# For now, we trust that admin users have access to operate on any tenant
|
||||
|
||||
logger.info("Tenant admin operation authorized",
|
||||
user_id=current_user.get('user_id'),
|
||||
tenant_id=tenant_id,
|
||||
endpoint=func.__name__)
|
||||
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def get_current_user(request: Request) -> Dict[str, Any]:
|
||||
"""Get current user from request state or headers"""
|
||||
if hasattr(request.state, 'user') and request.state.user:
|
||||
@@ -145,14 +351,18 @@ def extract_user_from_headers(request: Request) -> Optional[Dict[str, Any]]:
|
||||
"email": request.headers.get("x-user-email", ""),
|
||||
"role": request.headers.get("x-user-role", "user"),
|
||||
"tenant_id": request.headers.get("x-tenant-id"),
|
||||
"permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else []
|
||||
"permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else [],
|
||||
"full_name": request.headers.get("x-user-full-name", "")
|
||||
}
|
||||
|
||||
def extract_tenant_from_headers(request: Request) -> Optional[str]:
|
||||
"""Extract tenant ID from headers"""
|
||||
return request.headers.get("x-tenant-id")
|
||||
|
||||
# FastAPI Dependencies for injection
|
||||
# ================================================================
|
||||
# FASTAPI DEPENDENCY FUNCTIONS
|
||||
# ================================================================
|
||||
|
||||
async def get_current_user_dep(request: Request) -> Dict[str, Any]:
|
||||
"""FastAPI dependency to get current user"""
|
||||
return get_current_user(request)
|
||||
@@ -161,15 +371,180 @@ async def get_current_tenant_id_dep(request: Request) -> Optional[str]:
|
||||
"""FastAPI dependency to get current tenant ID"""
|
||||
return get_current_tenant_id(request)
|
||||
|
||||
async def require_admin_role_dep(
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep)
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
FastAPI dependency that requires admin role
|
||||
|
||||
Usage:
|
||||
@router.delete("/admin/users/{user_id}")
|
||||
async def delete_user(
|
||||
user_id: str,
|
||||
admin_user: Dict[str, Any] = Depends(require_admin_role_dep)
|
||||
):
|
||||
# admin_user is guaranteed to have admin role
|
||||
"""
|
||||
|
||||
user_role = current_user.get('role', '').lower()
|
||||
|
||||
if user_role != 'admin':
|
||||
logger.warning("Non-admin user attempted admin operation",
|
||||
user_id=current_user.get('user_id'),
|
||||
role=user_role)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Admin role required"
|
||||
)
|
||||
|
||||
logger.info("Admin operation authorized via dependency",
|
||||
user_id=current_user.get('user_id'))
|
||||
|
||||
return current_user
|
||||
|
||||
async def require_roles_dep(allowed_roles: List[str]):
|
||||
"""
|
||||
FastAPI dependency factory that requires one of multiple roles
|
||||
|
||||
Usage:
|
||||
require_manager_or_admin = require_roles_dep(['admin', 'manager'])
|
||||
|
||||
@router.post("/sensitive-operation")
|
||||
async def sensitive_operation(
|
||||
user: Dict[str, Any] = Depends(require_manager_or_admin)
|
||||
):
|
||||
# Only admins and managers can access
|
||||
"""
|
||||
|
||||
async def check_roles(
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep)
|
||||
) -> Dict[str, Any]:
|
||||
user_role = current_user.get('role', '').lower()
|
||||
allowed_roles_lower = [role.lower() for role in allowed_roles]
|
||||
|
||||
if user_role not in allowed_roles_lower:
|
||||
logger.warning("Unauthorized role attempted restricted operation",
|
||||
user_id=current_user.get('user_id'),
|
||||
user_role=user_role,
|
||||
allowed_roles=allowed_roles)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"One of these roles required: {', '.join(allowed_roles)}"
|
||||
)
|
||||
|
||||
logger.info("Role-based operation authorized via dependency",
|
||||
user_id=current_user.get('user_id'),
|
||||
user_role=user_role)
|
||||
|
||||
return current_user
|
||||
|
||||
return check_roles
|
||||
|
||||
# ================================================================
|
||||
# UTILITY FUNCTIONS FOR ROLE CHECKING
|
||||
# ================================================================
|
||||
|
||||
def is_admin_user(user: Dict[str, Any]) -> bool:
|
||||
"""Check if user has admin role"""
|
||||
return user.get('role', '').lower() == 'admin'
|
||||
|
||||
def is_user_in_roles(user: Dict[str, Any], allowed_roles: List[str]) -> bool:
|
||||
"""Check if user has one of the allowed roles"""
|
||||
user_role = user.get('role', '').lower()
|
||||
allowed_roles_lower = [role.lower() for role in allowed_roles]
|
||||
return user_role in allowed_roles_lower
|
||||
|
||||
def get_user_permissions(user: Dict[str, Any]) -> List[str]:
|
||||
"""Get user permissions list"""
|
||||
return user.get('permissions', [])
|
||||
|
||||
def has_permission(user: Dict[str, Any], permission: str) -> bool:
|
||||
"""Check if user has specific permission"""
|
||||
permissions = get_user_permissions(user)
|
||||
return permission in permissions
|
||||
|
||||
# ================================================================
|
||||
# ADVANCED ROLE DECORATORS
|
||||
# ================================================================
|
||||
|
||||
def require_permission(permission: str):
|
||||
"""
|
||||
Decorator to require specific permission
|
||||
|
||||
Usage:
|
||||
@require_permission('delete_users')
|
||||
@router.delete("/users/{user_id}")
|
||||
async def delete_user(...):
|
||||
# Only users with 'delete_users' permission can access
|
||||
"""
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
current_user = None
|
||||
|
||||
# Extract current_user from arguments
|
||||
for arg in args:
|
||||
if isinstance(arg, dict) and 'user_id' in arg:
|
||||
current_user = arg
|
||||
break
|
||||
|
||||
if not current_user:
|
||||
current_user = kwargs.get('current_user')
|
||||
|
||||
if not current_user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required"
|
||||
)
|
||||
|
||||
# Check permission
|
||||
if not has_permission(current_user, permission):
|
||||
# Admins bypass permission checks
|
||||
if not is_admin_user(current_user):
|
||||
logger.warning("User lacks required permission",
|
||||
user_id=current_user.get('user_id'),
|
||||
required_permission=permission,
|
||||
user_permissions=get_user_permissions(current_user))
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Permission '{permission}' required"
|
||||
)
|
||||
|
||||
logger.info("Permission-based operation authorized",
|
||||
user_id=current_user.get('user_id'),
|
||||
permission=permission)
|
||||
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
# Export all decorators and functions
|
||||
__all__ = [
|
||||
# Main decorators
|
||||
'require_authentication',
|
||||
'require_tenant_access',
|
||||
'require_tenant_access',
|
||||
'require_role',
|
||||
'get_current_user',
|
||||
'get_current_tenant_id',
|
||||
'require_admin_role',
|
||||
'require_roles',
|
||||
'require_tenant_admin',
|
||||
'require_permission',
|
||||
|
||||
# FastAPI dependencies
|
||||
'get_current_user_dep',
|
||||
'get_current_tenant_id_dep',
|
||||
'require_admin_role_dep',
|
||||
'require_roles_dep',
|
||||
|
||||
# Utility functions
|
||||
'get_current_user',
|
||||
'get_current_tenant_id',
|
||||
'extract_user_from_headers',
|
||||
'extract_tenant_from_headers'
|
||||
'extract_tenant_from_headers',
|
||||
'is_admin_user',
|
||||
'is_user_in_roles',
|
||||
'get_user_permissions',
|
||||
'has_permission'
|
||||
]
|
||||
Reference in New Issue
Block a user