Files
bakery-ia/shared/security/audit_logger.py
2025-10-29 06:58:05 +01:00

344 lines
11 KiB
Python

"""
Audit logging system for tracking critical operations across all services
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from enum import Enum
import structlog
from sqlalchemy import Column, String, DateTime, Text, Index
from sqlalchemy.dialects.postgresql import UUID, JSON
logger = structlog.get_logger()
class AuditSeverity(str, Enum):
"""Severity levels for audit events"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AuditAction(str, Enum):
"""Common audit action types"""
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
APPROVE = "approve"
REJECT = "reject"
CANCEL = "cancel"
EXPORT = "export"
IMPORT = "import"
INVITE = "invite"
REMOVE = "remove"
UPGRADE = "upgrade"
DOWNGRADE = "downgrade"
DEACTIVATE = "deactivate"
ACTIVATE = "activate"
def create_audit_log_model(Base):
"""
Factory function to create AuditLog model for any service
Each service has its own audit_logs table in their database
Usage in service models/__init__.py:
from shared.database.base import Base
from shared.security import create_audit_log_model
AuditLog = create_audit_log_model(Base)
Args:
Base: SQLAlchemy declarative base for the service
Returns:
AuditLog model class bound to the service's Base
"""
class AuditLog(Base):
"""
Audit log model for tracking critical operations
Each service has its own audit_logs table for data locality
"""
__tablename__ = "audit_logs"
# Primary identification
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# Tenant and user context
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
user_id = Column(UUID(as_uuid=True), nullable=False, index=True)
# Action details
action = Column(String(100), nullable=False, index=True) # create, update, delete, etc.
resource_type = Column(String(100), nullable=False, index=True) # supplier, recipe, order, etc.
resource_id = Column(String(255), nullable=True, index=True)
# Severity and categorization
severity = Column(
String(20),
nullable=False,
default="medium",
index=True
) # low, medium, high, critical
# Service identification
service_name = Column(String(100), nullable=False, index=True)
# Details
description = Column(Text, nullable=True)
# Audit trail data
changes = Column(JSON, nullable=True) # Before/after values for updates
audit_metadata = Column(JSON, nullable=True) # Additional context
# Request context
ip_address = Column(String(45), nullable=True) # IPv4 or IPv6
user_agent = Column(Text, nullable=True)
endpoint = Column(String(255), nullable=True)
method = Column(String(10), nullable=True) # GET, POST, PUT, DELETE
# Timestamps
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
index=True
)
# Composite indexes for common query patterns
__table_args__ = (
Index('idx_audit_tenant_created', 'tenant_id', 'created_at'),
Index('idx_audit_user_created', 'user_id', 'created_at'),
Index('idx_audit_resource_type_action', 'resource_type', 'action'),
Index('idx_audit_severity_created', 'severity', 'created_at'),
Index('idx_audit_service_created', 'service_name', 'created_at'),
)
def __repr__(self):
return (
f"<AuditLog(id={self.id}, tenant={self.tenant_id}, "
f"action={self.action}, resource={self.resource_type}, "
f"severity={self.severity})>"
)
def to_dict(self):
"""Convert audit log to dictionary"""
return {
"id": str(self.id),
"tenant_id": str(self.tenant_id),
"user_id": str(self.user_id),
"action": self.action,
"resource_type": self.resource_type,
"resource_id": self.resource_id,
"severity": self.severity,
"service_name": self.service_name,
"description": self.description,
"changes": self.changes,
"metadata": self.audit_metadata,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
"endpoint": self.endpoint,
"method": self.method,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
return AuditLog
class AuditLogger:
"""Service for logging audit events"""
def __init__(self, service_name: str, audit_log_model):
"""
Initialize AuditLogger with service-specific AuditLog model
Args:
service_name: Name of the service (e.g., "production-service")
audit_log_model: The service-specific AuditLog model class created via create_audit_log_model()
"""
if not audit_log_model:
raise ValueError(f"audit_log_model is required for AuditLogger in {service_name}")
self.service_name = service_name
self.audit_log_model = audit_log_model
self.logger = logger.bind(service=service_name)
async def log_event(
self,
db_session,
tenant_id: str,
user_id: str,
action: str,
resource_type: str,
resource_id: Optional[str] = None,
severity: str = "medium",
description: Optional[str] = None,
changes: Optional[Dict[str, Any]] = None,
audit_metadata: Optional[Dict[str, Any]] = None,
endpoint: Optional[str] = None,
method: Optional[str] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
):
"""
Log an audit event
Args:
db_session: Database session
tenant_id: Tenant ID
user_id: User ID who performed the action
action: Action performed (create, update, delete, etc.)
resource_type: Type of resource (user, sale, recipe, etc.)
resource_id: ID of the resource affected
severity: Severity level (low, medium, high, critical)
description: Human-readable description
changes: Dictionary of before/after values for updates
audit_metadata: Additional context
endpoint: API endpoint
method: HTTP method
ip_address: Client IP address
user_agent: Client user agent
"""
try:
audit_log = self.audit_log_model(
tenant_id=uuid.UUID(tenant_id) if isinstance(tenant_id, str) else tenant_id,
user_id=uuid.UUID(user_id) if isinstance(user_id, str) else user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
severity=severity,
service_name=self.service_name,
description=description,
changes=changes,
audit_metadata=audit_metadata,
endpoint=endpoint,
method=method,
ip_address=ip_address,
user_agent=user_agent,
)
db_session.add(audit_log)
await db_session.commit()
self.logger.info(
"audit_event_logged",
tenant_id=str(tenant_id),
user_id=str(user_id),
action=action,
resource_type=resource_type,
resource_id=resource_id,
severity=severity,
)
except Exception as e:
self.logger.error(
"audit_log_failed",
error=str(e),
tenant_id=str(tenant_id),
user_id=str(user_id),
action=action,
)
# Don't raise - audit logging should not block operations
async def log_deletion(
self,
db_session,
tenant_id: str,
user_id: str,
resource_type: str,
resource_id: str,
resource_data: Optional[Dict[str, Any]] = None,
**kwargs
):
"""Convenience method for logging deletions"""
return await self.log_event(
db_session=db_session,
tenant_id=tenant_id,
user_id=user_id,
action=AuditAction.DELETE.value,
resource_type=resource_type,
resource_id=resource_id,
severity=AuditSeverity.HIGH.value,
description=f"Deleted {resource_type} {resource_id}",
audit_metadata={"deleted_data": resource_data} if resource_data else None,
**kwargs
)
async def log_role_change(
self,
db_session,
tenant_id: str,
user_id: str,
target_user_id: str,
old_role: str,
new_role: str,
**kwargs
):
"""Convenience method for logging role changes"""
return await self.log_event(
db_session=db_session,
tenant_id=tenant_id,
user_id=user_id,
action=AuditAction.UPDATE.value,
resource_type="user_role",
resource_id=target_user_id,
severity=AuditSeverity.HIGH.value,
description=f"Changed user role from {old_role} to {new_role}",
changes={
"before": {"role": old_role},
"after": {"role": new_role}
},
**kwargs
)
async def log_subscription_change(
self,
db_session,
tenant_id: str,
user_id: str,
action: str,
old_plan: Optional[str] = None,
new_plan: Optional[str] = None,
**kwargs
):
"""Convenience method for logging subscription changes"""
return await self.log_event(
db_session=db_session,
tenant_id=tenant_id,
user_id=user_id,
action=action,
resource_type="subscription",
resource_id=tenant_id,
severity=AuditSeverity.CRITICAL.value,
description=f"Subscription {action}: {old_plan} -> {new_plan}" if old_plan else f"Subscription {action}: {new_plan}",
changes={
"before": {"plan": old_plan} if old_plan else None,
"after": {"plan": new_plan} if new_plan else None
},
**kwargs
)
def create_audit_logger(service_name: str, audit_log_model) -> AuditLogger:
"""
Factory function to create audit logger for a service
Args:
service_name: Name of the service (e.g., "production-service")
audit_log_model: The service-specific AuditLog model class (REQUIRED - created via create_audit_log_model)
Returns:
Configured AuditLogger instance
Example:
from app.models import AuditLog
from shared.security import create_audit_logger
audit_logger = create_audit_logger("production-service", AuditLog)
"""
return AuditLogger(service_name, audit_log_model)