Files
bakery-ia/shared/auth/decorators.py

175 lines
5.6 KiB
Python
Raw Normal View History

"""
2025-07-20 07:24:04 +02:00
Unified authentication decorators for microservices
Designed to work with gateway authentication middleware
"""
from functools import wraps
2025-07-20 07:24:04 +02:00
from fastapi import HTTPException, status, Request, Depends
from fastapi.security import HTTPBearer
from typing import Callable, Optional, Dict, Any
import structlog
logger = structlog.get_logger()
# Bearer token scheme for services that need it
security = HTTPBearer(auto_error=False)
2025-07-19 17:49:03 +02:00
def require_authentication(func: Callable) -> Callable:
2025-07-20 07:24:04 +02:00
"""
Decorator to require authentication - trusts gateway validation
Services behind the gateway should use this decorator
"""
2025-07-19 17:49:03 +02:00
@wraps(func)
async def wrapper(*args, **kwargs):
# Find request object in arguments
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
if not request:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Request object not found"
)
# Check if user context exists (set by gateway)
if not hasattr(request.state, 'user') or not request.state.user:
2025-07-20 07:24:04 +02:00
# Check headers as fallback (for direct service calls in dev)
user_info = extract_user_from_headers(request)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
request.state.user = user_info
2025-07-19 17:49:03 +02:00
return await func(*args, **kwargs)
return wrapper
2025-07-19 17:49:03 +02:00
def require_tenant_access(func: Callable) -> Callable:
"""Decorator to require tenant access"""
2025-07-19 17:49:03 +02:00
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
2025-07-20 07:24:04 +02:00
if not request:
request = kwargs.get('request')
2025-07-19 17:49:03 +02:00
if not request or not hasattr(request.state, 'tenant_id'):
2025-07-20 07:24:04 +02:00
# Try to extract from headers
tenant_id = extract_tenant_from_headers(request)
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant access required"
)
request.state.tenant_id = tenant_id
2025-07-19 17:49:03 +02:00
return await func(*args, **kwargs)
2025-07-19 17:49:03 +02:00
return wrapper
2025-07-20 07:24:04 +02:00
def require_role(role: str):
"""Decorator to require specific role"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
user = get_current_user(request)
user_role = user.get('role', '').lower()
if user_role != role.lower() and user_role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{role} role required"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def get_current_user(request: Request) -> Dict[str, Any]:
"""Get current user from request state or headers"""
if hasattr(request.state, 'user') and request.state.user:
return request.state.user
# Fallback to headers (for dev/testing)
user_info = extract_user_from_headers(request)
if not user_info:
2025-07-19 17:49:03 +02:00
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not authenticated"
)
2025-07-20 07:24:04 +02:00
return user_info
2025-07-19 17:49:03 +02:00
def get_current_tenant_id(request: Request) -> Optional[str]:
2025-07-20 07:24:04 +02:00
"""Get current tenant ID from request state or headers"""
if hasattr(request.state, 'tenant_id'):
return request.state.tenant_id
# Fallback to headers
return extract_tenant_from_headers(request)
def extract_user_from_headers(request: Request) -> Optional[Dict[str, Any]]:
"""Extract user information from forwarded headers (gateway sets these)"""
2025-07-26 18:46:52 +02:00
user_id = request.headers.get("x-user-id")
2025-07-20 07:24:04 +02:00
if not user_id:
return None
return {
"user_id": user_id,
2025-07-26 18:46:52 +02:00
"email": request.headers.get("x-user-email", ""),
"role": request.headers.get("x-user-role", "user"),
"tenant_id": request.headers.get("x-tenant-id"),
2025-07-20 07:24:04 +02:00
"permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else []
}
def extract_tenant_from_headers(request: Request) -> Optional[str]:
"""Extract tenant ID from headers"""
2025-07-26 18:46:52 +02:00
return request.headers.get("x-tenant-id")
2025-07-20 07:24:04 +02:00
# FastAPI Dependencies for injection
async def get_current_user_dep(request: Request) -> Dict[str, Any]:
"""FastAPI dependency to get current user"""
return get_current_user(request)
async def get_current_tenant_id_dep(request: Request) -> Optional[str]:
"""FastAPI dependency to get current tenant ID"""
return get_current_tenant_id(request)
# Export all decorators and functions
__all__ = [
'require_authentication',
'require_tenant_access',
'require_role',
'get_current_user',
'get_current_tenant_id',
'get_current_user_dep',
'get_current_tenant_id_dep',
'extract_user_from_headers',
'extract_tenant_from_headers'
]