Files
bakery-ia/shared/auth/decorators.py

705 lines
25 KiB
Python
Raw Permalink Normal View History

2025-08-02 09:41:50 +02:00
# ================================================================
# shared/auth/decorators.py - ENHANCED WITH ADMIN ROLE DECORATOR
# ================================================================
"""
2025-08-02 09:41:50 +02:00
Enhanced authentication decorators for microservices including admin role validation.
Designed to work with gateway authentication middleware and provide centralized
role-based access control across all services.
"""
from functools import wraps
2025-07-20 07:24:04 +02:00
from fastapi import HTTPException, status, Request, Depends
from fastapi.security import HTTPBearer
2025-08-02 09:41:50 +02:00
from typing import Callable, Optional, Dict, Any, List
2025-07-20 07:24:04 +02:00
import structlog
logger = structlog.get_logger()
# Bearer token scheme for services that need it
security = HTTPBearer(auto_error=False)
2025-07-19 17:49:03 +02:00
def require_authentication(func: Callable) -> Callable:
2025-07-20 07:24:04 +02:00
"""
Decorator to require authentication - trusts gateway validation
Services behind the gateway should use this decorator
"""
2025-07-19 17:49:03 +02:00
@wraps(func)
async def wrapper(*args, **kwargs):
# Find request object in arguments
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
if not request:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Request object not found"
)
# Check if user context exists (set by gateway)
if not hasattr(request.state, 'user') or not request.state.user:
2025-07-20 07:24:04 +02:00
# Check headers as fallback (for direct service calls in dev)
user_info = extract_user_from_headers(request)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
request.state.user = user_info
2025-07-19 17:49:03 +02:00
return await func(*args, **kwargs)
return wrapper
2025-07-19 17:49:03 +02:00
def require_tenant_access(func: Callable) -> Callable:
"""Decorator to require tenant access"""
2025-07-19 17:49:03 +02:00
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
2025-07-20 07:24:04 +02:00
if not request:
request = kwargs.get('request')
2025-07-19 17:49:03 +02:00
if not request or not hasattr(request.state, 'tenant_id'):
2025-07-20 07:24:04 +02:00
# Try to extract from headers
tenant_id = extract_tenant_from_headers(request)
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant access required"
)
request.state.tenant_id = tenant_id
2025-07-19 17:49:03 +02:00
return await func(*args, **kwargs)
2025-07-19 17:49:03 +02:00
return wrapper
2025-07-20 07:24:04 +02:00
def require_role(role: str):
"""Decorator to require specific role"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
user = get_current_user(request)
user_role = user.get('role', '').lower()
if user_role != role.lower() and user_role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{role} role required"
)
return await func(*args, **kwargs)
return wrapper
return decorator
2025-08-02 09:41:50 +02:00
def require_admin_role(func: Callable) -> Callable:
"""
Decorator to require admin role - simplified version for FastAPI dependencies
This decorator ensures only users with 'admin' role can access the endpoint.
Can be used as a FastAPI dependency or function decorator.
Usage as dependency:
@router.delete("/admin/users/{user_id}")
async def delete_user(
user_id: str,
current_user = Depends(get_current_user_dep),
_admin_check = Depends(require_admin_role),
):
# Admin-only logic here
Usage as decorator:
@require_admin_role
@router.delete("/admin/users/{user_id}")
async def delete_user(...):
# Admin-only logic here
"""
@wraps(func)
async def wrapper(*args, **kwargs):
# Find request object in arguments
request = None
current_user = None
# Extract request and current_user from arguments
for arg in args:
if isinstance(arg, Request):
request = arg
elif isinstance(arg, dict) and 'user_id' in arg:
current_user = arg
# Check kwargs for request and current_user
if not request:
request = kwargs.get('request')
if not current_user:
current_user = kwargs.get('current_user')
# If we still don't have current_user, try to get it from request
if not current_user and request:
current_user = get_current_user(request)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
# Check if user has admin role
user_role = current_user.get('role', '').lower()
if user_role != 'admin':
logger.warning("Non-admin user attempted admin operation",
user_id=current_user.get('user_id'),
role=user_role)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin role required"
)
logger.info("Admin operation authorized",
user_id=current_user.get('user_id'),
endpoint=func.__name__)
return await func(*args, **kwargs)
return wrapper
def require_roles(allowed_roles: List[str]):
"""
Decorator to require one of multiple roles
Args:
allowed_roles: List of roles that are allowed to access the endpoint
Usage:
@require_roles(['admin', 'manager'])
@router.post("/sensitive-operation")
async def sensitive_operation(...):
# Only admins and managers can access
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
current_user = None
# Extract request and current_user from arguments
for arg in args:
if isinstance(arg, Request):
request = arg
elif isinstance(arg, dict) and 'user_id' in arg:
current_user = arg
# Check kwargs
if not request:
request = kwargs.get('request')
if not current_user:
current_user = kwargs.get('current_user')
# Get user from request if not provided
if not current_user and request:
current_user = get_current_user(request)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
# Check if user has one of the allowed roles
user_role = current_user.get('role', '').lower()
allowed_roles_lower = [role.lower() for role in allowed_roles]
if user_role not in allowed_roles_lower:
logger.warning("Unauthorized role attempted restricted operation",
user_id=current_user.get('user_id'),
user_role=user_role,
allowed_roles=allowed_roles)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of these roles required: {', '.join(allowed_roles)}"
)
logger.info("Role-based operation authorized",
user_id=current_user.get('user_id'),
user_role=user_role,
endpoint=func.__name__)
return await func(*args, **kwargs)
return wrapper
return decorator
def require_tenant_admin(func: Callable) -> Callable:
"""
Decorator to require admin role within a specific tenant context
This checks that the user is an admin AND has access to the tenant
being operated on. Useful for tenant-scoped admin operations.
"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
current_user = None
# Extract request and current_user from arguments
for arg in args:
if isinstance(arg, Request):
request = arg
elif isinstance(arg, dict) and 'user_id' in arg:
current_user = arg
if not request:
request = kwargs.get('request')
if not current_user:
current_user = kwargs.get('current_user')
if not current_user and request:
current_user = get_current_user(request)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
# Check admin role first
user_role = current_user.get('role', '').lower()
if user_role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin role required"
)
# Check tenant access
tenant_id = get_current_tenant_id(request) if request else None
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant context required"
)
# Additional tenant admin validation could go here
# For now, we trust that admin users have access to operate on any tenant
logger.info("Tenant admin operation authorized",
user_id=current_user.get('user_id'),
tenant_id=tenant_id,
endpoint=func.__name__)
return await func(*args, **kwargs)
return wrapper
2025-07-20 07:24:04 +02:00
def get_current_user(request: Request) -> Dict[str, Any]:
"""Get current user from request state or headers"""
if hasattr(request.state, 'user') and request.state.user:
return request.state.user
# Fallback to headers (for dev/testing)
user_info = extract_user_from_headers(request)
if not user_info:
2025-07-19 17:49:03 +02:00
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not authenticated"
)
2025-07-20 07:24:04 +02:00
return user_info
2025-07-19 17:49:03 +02:00
def get_current_tenant_id(request: Request) -> Optional[str]:
2025-07-20 07:24:04 +02:00
"""Get current tenant ID from request state or headers"""
if hasattr(request.state, 'tenant_id'):
return request.state.tenant_id
# Fallback to headers
return extract_tenant_from_headers(request)
def extract_user_from_headers(request: Request) -> Optional[Dict[str, Any]]:
"""Extract user information from forwarded headers (gateway sets these)"""
2025-07-26 18:46:52 +02:00
user_id = request.headers.get("x-user-id")
2026-01-12 14:24:14 +01:00
logger.info(f"🔍 Extracting user from headers",
user_id=user_id,
has_user_id=bool(user_id),
path=request.url.path)
2025-07-20 07:24:04 +02:00
if not user_id:
2026-01-12 14:24:14 +01:00
logger.warning(f"❌ No x-user-id header found", path=request.url.path)
2025-07-20 07:24:04 +02:00
return None
2025-10-17 07:31:14 +02:00
2025-08-02 23:29:18 +02:00
user_context = {
2025-07-20 07:24:04 +02:00
"user_id": user_id,
2025-07-26 18:46:52 +02:00
"email": request.headers.get("x-user-email", ""),
"role": request.headers.get("x-user-role", "user"),
"tenant_id": request.headers.get("x-tenant-id"),
2025-08-02 09:41:50 +02:00
"permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else [],
2025-10-17 07:31:14 +02:00
"full_name": request.headers.get("x-user-full-name", ""),
2025-11-30 09:12:40 +01:00
"subscription_tier": request.headers.get("x-subscription-tier", ""),
2025-11-30 16:29:38 +01:00
"is_demo": request.headers.get("x-is-demo", "").lower() == "true",
"demo_session_id": request.headers.get("x-demo-session-id", ""),
"demo_account_type": request.headers.get("x-demo-account-type", "")
2025-07-20 07:24:04 +02:00
}
2025-10-17 07:31:14 +02:00
2026-01-12 14:24:14 +01:00
logger.info(f"✅ User context extracted from headers",
user_context=user_context,
path=request.url.path)
2025-08-02 23:29:18 +02:00
# ✅ ADD THIS: Handle service tokens properly
user_type = request.headers.get("x-user-type", "")
service_name = request.headers.get("x-service-name", "")
2025-10-17 07:31:14 +02:00
2025-08-02 23:29:18 +02:00
if user_type == "service" or service_name:
user_context.update({
"type": "service",
"service": service_name,
"role": "admin", # Service tokens always have admin role
"is_service": True
})
2025-10-17 07:31:14 +02:00
2025-08-02 23:29:18 +02:00
return user_context
2025-07-20 07:24:04 +02:00
def extract_tenant_from_headers(request: Request) -> Optional[str]:
"""Extract tenant ID from headers"""
2025-07-26 18:46:52 +02:00
return request.headers.get("x-tenant-id")
2025-07-20 07:24:04 +02:00
2025-10-24 13:05:04 +02:00
def extract_user_from_jwt(auth_header: str) -> Optional[Dict[str, Any]]:
"""
Extract user information from JWT token
This is a fallback for when gateway doesn't inject x-user-* headers
"""
try:
from jose import jwt
from shared.config.base import is_internal_service
# Remove "Bearer " prefix
token = auth_header.replace("Bearer ", "").strip()
# Decode without verification (we trust tokens from gateway)
# In production, you'd verify with the secret key
payload = jwt.decode(token, key="dummy", options={"verify_signature": False})
logger.debug("JWT payload decoded", payload_keys=list(payload.keys()))
# Extract user information from JWT payload
user_id = payload.get("sub") or payload.get("user_id") or payload.get("service")
if not user_id:
logger.warning("No user_id found in JWT payload", payload=payload)
return None
# Check if this is a service token
token_type = payload.get("type", "")
service_name = payload.get("service", "")
if token_type == "service" or is_internal_service(user_id) or is_internal_service(service_name):
# This is a service token
service_identifier = service_name or user_id
user_context = {
"user_id": service_identifier,
"type": "service",
"service": service_identifier,
"role": "admin", # Services get admin privileges
"is_service": True,
"permissions": ["read", "write", "admin"],
"email": f"{service_identifier}@internal.service",
"full_name": f"{service_identifier.replace('-', ' ').title()}"
}
logger.info("Service authenticated via JWT", service=service_identifier)
else:
# This is a user token
user_context = {
"user_id": user_id,
"type": "user",
"email": payload.get("email", ""),
"role": payload.get("role", "user"),
"tenant_id": payload.get("tenant_id"),
"permissions": payload.get("permissions", []),
"full_name": payload.get("full_name", ""),
"subscription_tier": payload.get("subscription_tier", ""),
"is_service": False
}
logger.info("User authenticated via JWT", user_id=user_id)
return user_context
except Exception as e:
logger.error("Failed to extract user from JWT", error=str(e), error_type=type(e).__name__)
return None
2025-08-02 09:41:50 +02:00
# ================================================================
# FASTAPI DEPENDENCY FUNCTIONS
# ================================================================
2025-07-20 07:24:04 +02:00
async def get_current_user_dep(request: Request) -> Dict[str, Any]:
2025-10-24 13:05:04 +02:00
"""FastAPI dependency to get current user - ENHANCED with JWT fallback for services"""
try:
2026-01-12 14:24:14 +01:00
# Enhanced logging for debugging
logger.info(
"🔐 Authentication attempt",
path=request.url.path,
method=request.method,
has_auth_header=bool(request.headers.get("authorization")),
has_x_user_id=bool(request.headers.get("x-user-id")),
2026-01-12 14:24:14 +01:00
has_x_is_demo=bool(request.headers.get("x-is-demo")),
has_x_demo_session_id=bool(request.headers.get("x-demo-session-id")),
x_user_id=request.headers.get("x-user-id", "MISSING"),
x_is_demo=request.headers.get("x-is-demo", "MISSING"),
x_demo_session_id=request.headers.get("x-demo-session-id", "MISSING"),
client_ip=request.client.host if request.client else "unknown"
)
2025-10-24 13:05:04 +02:00
# Try to get user from headers first (preferred method)
user = None
try:
user = get_current_user(request)
except HTTPException:
# If headers are missing, try JWT token as fallback
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
user = extract_user_from_jwt(auth_header)
if user:
logger.info(
"User authenticated via JWT fallback",
user_id=user.get("user_id"),
user_type=user.get("type", "user"),
is_service=user.get("type") == "service",
path=request.url.path
)
# If still no user, raise original exception
if not user:
raise
logger.info(
"User authenticated successfully",
user_id=user.get("user_id"),
user_type=user.get("type", "user"),
is_service=user.get("type") == "service",
role=user.get("role"),
path=request.url.path
)
return user
except HTTPException as e:
logger.warning(
"Authentication failed - 401",
path=request.url.path,
status_code=e.status_code,
detail=e.detail,
has_x_user_id=bool(request.headers.get("x-user-id")),
2025-10-24 13:05:04 +02:00
has_auth_header=bool(request.headers.get("authorization")),
x_user_type=request.headers.get("x-user-type", "none"),
x_service_name=request.headers.get("x-service-name", "none"),
client_ip=request.client.host if request.client else "unknown"
)
raise
2025-07-20 07:24:04 +02:00
async def get_current_tenant_id_dep(request: Request) -> Optional[str]:
"""FastAPI dependency to get current tenant ID"""
return get_current_tenant_id(request)
2025-08-02 09:41:50 +02:00
async def require_admin_role_dep(
current_user: Dict[str, Any] = Depends(get_current_user_dep)
) -> Dict[str, Any]:
"""
FastAPI dependency that requires admin role
Usage:
@router.delete("/admin/users/{user_id}")
async def delete_user(
user_id: str,
admin_user: Dict[str, Any] = Depends(require_admin_role_dep)
):
# admin_user is guaranteed to have admin role
"""
user_role = current_user.get('role', '').lower()
if user_role != 'admin':
logger.warning("Non-admin user attempted admin operation",
user_id=current_user.get('user_id'),
role=user_role)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin role required"
)
logger.info("Admin operation authorized via dependency",
user_id=current_user.get('user_id'))
return current_user
async def require_roles_dep(allowed_roles: List[str]):
"""
FastAPI dependency factory that requires one of multiple roles
Usage:
require_manager_or_admin = require_roles_dep(['admin', 'manager'])
@router.post("/sensitive-operation")
async def sensitive_operation(
user: Dict[str, Any] = Depends(require_manager_or_admin)
):
# Only admins and managers can access
"""
async def check_roles(
current_user: Dict[str, Any] = Depends(get_current_user_dep)
) -> Dict[str, Any]:
user_role = current_user.get('role', '').lower()
allowed_roles_lower = [role.lower() for role in allowed_roles]
if user_role not in allowed_roles_lower:
logger.warning("Unauthorized role attempted restricted operation",
user_id=current_user.get('user_id'),
user_role=user_role,
allowed_roles=allowed_roles)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of these roles required: {', '.join(allowed_roles)}"
)
logger.info("Role-based operation authorized via dependency",
user_id=current_user.get('user_id'),
user_role=user_role)
return current_user
return check_roles
# ================================================================
# UTILITY FUNCTIONS FOR ROLE CHECKING
# ================================================================
def is_admin_user(user: Dict[str, Any]) -> bool:
"""Check if user has admin role"""
return user.get('role', '').lower() == 'admin'
def is_user_in_roles(user: Dict[str, Any], allowed_roles: List[str]) -> bool:
"""Check if user has one of the allowed roles"""
user_role = user.get('role', '').lower()
allowed_roles_lower = [role.lower() for role in allowed_roles]
return user_role in allowed_roles_lower
def get_user_permissions(user: Dict[str, Any]) -> List[str]:
"""Get user permissions list"""
return user.get('permissions', [])
def has_permission(user: Dict[str, Any], permission: str) -> bool:
"""Check if user has specific permission"""
permissions = get_user_permissions(user)
return permission in permissions
# ================================================================
# ADVANCED ROLE DECORATORS
# ================================================================
def require_permission(permission: str):
"""
Decorator to require specific permission
Usage:
@require_permission('delete_users')
@router.delete("/users/{user_id}")
async def delete_user(...):
# Only users with 'delete_users' permission can access
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
current_user = None
# Extract current_user from arguments
for arg in args:
if isinstance(arg, dict) and 'user_id' in arg:
current_user = arg
break
if not current_user:
current_user = kwargs.get('current_user')
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
# Check permission
if not has_permission(current_user, permission):
# Admins bypass permission checks
if not is_admin_user(current_user):
logger.warning("User lacks required permission",
user_id=current_user.get('user_id'),
required_permission=permission,
user_permissions=get_user_permissions(current_user))
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
logger.info("Permission-based operation authorized",
user_id=current_user.get('user_id'),
permission=permission)
return await func(*args, **kwargs)
return wrapper
return decorator
2025-07-20 07:24:04 +02:00
# Export all decorators and functions
__all__ = [
2025-08-02 09:41:50 +02:00
# Main decorators
2025-07-20 07:24:04 +02:00
'require_authentication',
2025-08-02 09:41:50 +02:00
'require_tenant_access',
2025-07-20 07:24:04 +02:00
'require_role',
2025-08-02 09:41:50 +02:00
'require_admin_role',
'require_roles',
'require_tenant_admin',
'require_permission',
# FastAPI dependencies
2025-07-20 07:24:04 +02:00
'get_current_user_dep',
'get_current_tenant_id_dep',
2025-08-02 09:41:50 +02:00
'require_admin_role_dep',
'require_roles_dep',
# Utility functions
'get_current_user',
'get_current_tenant_id',
2025-07-20 07:24:04 +02:00
'extract_user_from_headers',
2025-10-24 13:05:04 +02:00
'extract_user_from_jwt',
2025-08-02 09:41:50 +02:00
'extract_tenant_from_headers',
'is_admin_user',
'is_user_in_roles',
'get_user_permissions',
'has_permission'
2025-10-24 13:05:04 +02:00
]