""" Audit logging system for tracking critical operations across all services """ import uuid from datetime import datetime, timezone from typing import Optional, Dict, Any from enum import Enum import structlog from sqlalchemy import Column, String, DateTime, Text, Index from sqlalchemy.dialects.postgresql import UUID, JSON logger = structlog.get_logger() class AuditSeverity(str, Enum): """Severity levels for audit events""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class AuditAction(str, Enum): """Common audit action types""" CREATE = "create" READ = "read" UPDATE = "update" DELETE = "delete" APPROVE = "approve" REJECT = "reject" CANCEL = "cancel" EXPORT = "export" IMPORT = "import" INVITE = "invite" REMOVE = "remove" UPGRADE = "upgrade" DOWNGRADE = "downgrade" DEACTIVATE = "deactivate" ACTIVATE = "activate" def create_audit_log_model(Base): """ Factory function to create AuditLog model for any service Each service has its own audit_logs table in their database Usage in service models/__init__.py: from shared.database.base import Base from shared.security import create_audit_log_model AuditLog = create_audit_log_model(Base) Args: Base: SQLAlchemy declarative base for the service Returns: AuditLog model class bound to the service's Base """ class AuditLog(Base): """ Audit log model for tracking critical operations Each service has its own audit_logs table for data locality """ __tablename__ = "audit_logs" # Primary identification id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) # Tenant and user context tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True) user_id = Column(UUID(as_uuid=True), nullable=False, index=True) # Action details action = Column(String(100), nullable=False, index=True) # create, update, delete, etc. resource_type = Column(String(100), nullable=False, index=True) # supplier, recipe, order, etc. resource_id = Column(String(255), nullable=True, index=True) # Severity and categorization severity = Column( String(20), nullable=False, default="medium", index=True ) # low, medium, high, critical # Service identification service_name = Column(String(100), nullable=False, index=True) # Details description = Column(Text, nullable=True) # Audit trail data changes = Column(JSON, nullable=True) # Before/after values for updates audit_metadata = Column(JSON, nullable=True) # Additional context # Request context ip_address = Column(String(45), nullable=True) # IPv4 or IPv6 user_agent = Column(Text, nullable=True) endpoint = Column(String(255), nullable=True) method = Column(String(10), nullable=True) # GET, POST, PUT, DELETE # Timestamps created_at = Column( DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc), index=True ) # Composite indexes for common query patterns __table_args__ = ( Index('idx_audit_tenant_created', 'tenant_id', 'created_at'), Index('idx_audit_user_created', 'user_id', 'created_at'), Index('idx_audit_resource_type_action', 'resource_type', 'action'), Index('idx_audit_severity_created', 'severity', 'created_at'), Index('idx_audit_service_created', 'service_name', 'created_at'), ) def __repr__(self): return ( f"" ) def to_dict(self): """Convert audit log to dictionary""" return { "id": str(self.id), "tenant_id": str(self.tenant_id), "user_id": str(self.user_id), "action": self.action, "resource_type": self.resource_type, "resource_id": self.resource_id, "severity": self.severity, "service_name": self.service_name, "description": self.description, "changes": self.changes, "metadata": self.audit_metadata, "ip_address": self.ip_address, "user_agent": self.user_agent, "endpoint": self.endpoint, "method": self.method, "created_at": self.created_at.isoformat() if self.created_at else None, } return AuditLog class AuditLogger: """Service for logging audit events""" def __init__(self, service_name: str, audit_log_model): """ Initialize AuditLogger with service-specific AuditLog model Args: service_name: Name of the service (e.g., "production-service") audit_log_model: The service-specific AuditLog model class created via create_audit_log_model() """ if not audit_log_model: raise ValueError(f"audit_log_model is required for AuditLogger in {service_name}") self.service_name = service_name self.audit_log_model = audit_log_model self.logger = logger.bind(service=service_name) async def log_event( self, db_session, tenant_id: str, user_id: str, action: str, resource_type: str, resource_id: Optional[str] = None, severity: str = "medium", description: Optional[str] = None, changes: Optional[Dict[str, Any]] = None, audit_metadata: Optional[Dict[str, Any]] = None, endpoint: Optional[str] = None, method: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None, ): """ Log an audit event Args: db_session: Database session tenant_id: Tenant ID user_id: User ID who performed the action action: Action performed (create, update, delete, etc.) resource_type: Type of resource (user, sale, recipe, etc.) resource_id: ID of the resource affected severity: Severity level (low, medium, high, critical) description: Human-readable description changes: Dictionary of before/after values for updates audit_metadata: Additional context endpoint: API endpoint method: HTTP method ip_address: Client IP address user_agent: Client user agent """ try: audit_log = self.audit_log_model( tenant_id=uuid.UUID(tenant_id) if isinstance(tenant_id, str) else tenant_id, user_id=uuid.UUID(user_id) if isinstance(user_id, str) else user_id, action=action, resource_type=resource_type, resource_id=resource_id, severity=severity, service_name=self.service_name, description=description, changes=changes, audit_metadata=audit_metadata, endpoint=endpoint, method=method, ip_address=ip_address, user_agent=user_agent, ) db_session.add(audit_log) await db_session.commit() self.logger.info( "audit_event_logged", tenant_id=str(tenant_id), user_id=str(user_id), action=action, resource_type=resource_type, resource_id=resource_id, severity=severity, ) except Exception as e: self.logger.error( "audit_log_failed", error=str(e), tenant_id=str(tenant_id), user_id=str(user_id), action=action, ) # Don't raise - audit logging should not block operations async def log_deletion( self, db_session, tenant_id: str, user_id: str, resource_type: str, resource_id: str, resource_data: Optional[Dict[str, Any]] = None, **kwargs ): """Convenience method for logging deletions""" return await self.log_event( db_session=db_session, tenant_id=tenant_id, user_id=user_id, action=AuditAction.DELETE.value, resource_type=resource_type, resource_id=resource_id, severity=AuditSeverity.HIGH.value, description=f"Deleted {resource_type} {resource_id}", audit_metadata={"deleted_data": resource_data} if resource_data else None, **kwargs ) async def log_role_change( self, db_session, tenant_id: str, user_id: str, target_user_id: str, old_role: str, new_role: str, **kwargs ): """Convenience method for logging role changes""" return await self.log_event( db_session=db_session, tenant_id=tenant_id, user_id=user_id, action=AuditAction.UPDATE.value, resource_type="user_role", resource_id=target_user_id, severity=AuditSeverity.HIGH.value, description=f"Changed user role from {old_role} to {new_role}", changes={ "before": {"role": old_role}, "after": {"role": new_role} }, **kwargs ) async def log_subscription_change( self, db_session, tenant_id: str, user_id: str, action: str, old_plan: Optional[str] = None, new_plan: Optional[str] = None, **kwargs ): """Convenience method for logging subscription changes""" return await self.log_event( db_session=db_session, tenant_id=tenant_id, user_id=user_id, action=action, resource_type="subscription", resource_id=tenant_id, severity=AuditSeverity.CRITICAL.value, description=f"Subscription {action}: {old_plan} -> {new_plan}" if old_plan else f"Subscription {action}: {new_plan}", changes={ "before": {"plan": old_plan} if old_plan else None, "after": {"plan": new_plan} if new_plan else None }, **kwargs ) def create_audit_logger(service_name: str, audit_log_model) -> AuditLogger: """ Factory function to create audit logger for a service Args: service_name: Name of the service (e.g., "production-service") audit_log_model: The service-specific AuditLog model class (REQUIRED - created via create_audit_log_model) Returns: Configured AuditLogger instance Example: from app.models import AuditLog from shared.security import create_audit_logger audit_logger = create_audit_logger("production-service", AuditLog) """ return AuditLogger(service_name, audit_log_model)