Files
bakery-ia/shared/auth/decorators.py
2025-07-26 18:46:52 +02:00

175 lines
5.6 KiB
Python

"""
Unified authentication decorators for microservices
Designed to work with gateway authentication middleware
"""
from functools import wraps
from fastapi import HTTPException, status, Request, Depends
from fastapi.security import HTTPBearer
from typing import Callable, Optional, Dict, Any
import structlog
logger = structlog.get_logger()
# Bearer token scheme for services that need it
security = HTTPBearer(auto_error=False)
def require_authentication(func: Callable) -> Callable:
"""
Decorator to require authentication - trusts gateway validation
Services behind the gateway should use this decorator
"""
@wraps(func)
async def wrapper(*args, **kwargs):
# Find request object in arguments
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
if not request:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Request object not found"
)
# Check if user context exists (set by gateway)
if not hasattr(request.state, 'user') or not request.state.user:
# Check headers as fallback (for direct service calls in dev)
user_info = extract_user_from_headers(request)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
request.state.user = user_info
return await func(*args, **kwargs)
return wrapper
def require_tenant_access(func: Callable) -> Callable:
"""Decorator to require tenant access"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
if not request or not hasattr(request.state, 'tenant_id'):
# Try to extract from headers
tenant_id = extract_tenant_from_headers(request)
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant access required"
)
request.state.tenant_id = tenant_id
return await func(*args, **kwargs)
return wrapper
def require_role(role: str):
"""Decorator to require specific role"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get('request')
user = get_current_user(request)
user_role = user.get('role', '').lower()
if user_role != role.lower() and user_role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{role} role required"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def get_current_user(request: Request) -> Dict[str, Any]:
"""Get current user from request state or headers"""
if hasattr(request.state, 'user') and request.state.user:
return request.state.user
# Fallback to headers (for dev/testing)
user_info = extract_user_from_headers(request)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not authenticated"
)
return user_info
def get_current_tenant_id(request: Request) -> Optional[str]:
"""Get current tenant ID from request state or headers"""
if hasattr(request.state, 'tenant_id'):
return request.state.tenant_id
# Fallback to headers
return extract_tenant_from_headers(request)
def extract_user_from_headers(request: Request) -> Optional[Dict[str, Any]]:
"""Extract user information from forwarded headers (gateway sets these)"""
user_id = request.headers.get("x-user-id")
if not user_id:
return None
return {
"user_id": user_id,
"email": request.headers.get("x-user-email", ""),
"role": request.headers.get("x-user-role", "user"),
"tenant_id": request.headers.get("x-tenant-id"),
"permissions": request.headers.get("X-User-Permissions", "").split(",") if request.headers.get("X-User-Permissions") else []
}
def extract_tenant_from_headers(request: Request) -> Optional[str]:
"""Extract tenant ID from headers"""
return request.headers.get("x-tenant-id")
# FastAPI Dependencies for injection
async def get_current_user_dep(request: Request) -> Dict[str, Any]:
"""FastAPI dependency to get current user"""
return get_current_user(request)
async def get_current_tenant_id_dep(request: Request) -> Optional[str]:
"""FastAPI dependency to get current tenant ID"""
return get_current_tenant_id(request)
# Export all decorators and functions
__all__ = [
'require_authentication',
'require_tenant_access',
'require_role',
'get_current_user',
'get_current_tenant_id',
'get_current_user_dep',
'get_current_tenant_id_dep',
'extract_user_from_headers',
'extract_tenant_from_headers'
]