Files
bakery-ia/services/auth/app/services/deletion_orchestrator.py

599 lines
22 KiB
Python

"""
Deletion Orchestrator Service
Coordinates tenant deletion across all microservices with saga pattern support
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
import structlog
import httpx
import asyncio
from uuid import uuid4, UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.deletion_job import DeletionJob as DeletionJobModel
from app.repositories.deletion_job_repository import DeletionJobRepository
logger = structlog.get_logger()
class DeletionStatus(Enum):
"""Status of deletion job"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
class ServiceDeletionStatus(Enum):
"""Status of individual service deletion"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
@dataclass
class ServiceDeletionResult:
"""Result from a single service deletion"""
service_name: str
status: ServiceDeletionStatus
deleted_counts: Dict[str, int] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
duration_seconds: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
@property
def total_deleted(self) -> int:
return sum(self.deleted_counts.values())
@property
def success(self) -> bool:
return self.status == ServiceDeletionStatus.SUCCESS and len(self.errors) == 0
@dataclass
class DeletionJob:
"""Tracks a complete tenant deletion job"""
job_id: str
tenant_id: str
tenant_name: Optional[str] = None
initiated_by: Optional[str] = None
status: DeletionStatus = DeletionStatus.PENDING
service_results: Dict[str, ServiceDeletionResult] = field(default_factory=dict)
started_at: Optional[str] = None
completed_at: Optional[str] = None
error_log: List[str] = field(default_factory=list)
@property
def total_items_deleted(self) -> int:
return sum(result.total_deleted for result in self.service_results.values())
@property
def services_completed(self) -> int:
return sum(1 for r in self.service_results.values()
if r.status == ServiceDeletionStatus.SUCCESS)
@property
def services_failed(self) -> int:
return sum(1 for r in self.service_results.values()
if r.status == ServiceDeletionStatus.FAILED)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API responses"""
return {
"job_id": self.job_id,
"tenant_id": self.tenant_id,
"tenant_name": self.tenant_name,
"initiated_by": self.initiated_by,
"status": self.status.value,
"total_items_deleted": self.total_items_deleted,
"services_completed": self.services_completed,
"services_failed": self.services_failed,
"service_results": {
name: {
"status": result.status.value,
"deleted_counts": result.deleted_counts,
"total_deleted": result.total_deleted,
"errors": result.errors,
"duration_seconds": result.duration_seconds
}
for name, result in self.service_results.items()
},
"started_at": self.started_at,
"completed_at": self.completed_at,
"error_log": self.error_log
}
class DeletionOrchestrator:
"""
Orchestrates tenant deletion across all microservices
Implements saga pattern for distributed transactions
"""
# Service registry with deletion endpoints
# All services implement DELETE /tenant/{tenant_id} and GET /tenant/{tenant_id}/deletion-preview
# STATUS: 12/12 services implemented (100% COMPLETE)
SERVICE_DELETION_ENDPOINTS = {
# Core business services (6/6 complete)
"orders": "http://orders-service:8000/api/v1/orders/tenant/{tenant_id}",
"inventory": "http://inventory-service:8000/api/v1/inventory/tenant/{tenant_id}",
"recipes": "http://recipes-service:8000/api/v1/recipes/tenant/{tenant_id}",
"production": "http://production-service:8000/api/v1/production/tenant/{tenant_id}",
"sales": "http://sales-service:8000/api/v1/sales/tenant/{tenant_id}",
"suppliers": "http://suppliers-service:8000/api/v1/suppliers/tenant/{tenant_id}",
# Integration services (2/2 complete)
"pos": "http://pos-service:8000/api/v1/pos/tenant/{tenant_id}",
"external": "http://external-service:8000/api/v1/external/tenant/{tenant_id}",
# AI/ML services (2/2 complete)
"forecasting": "http://forecasting-service:8000/api/v1/forecasting/tenant/{tenant_id}",
"training": "http://training-service:8000/api/v1/training/tenant/{tenant_id}",
# Alert and notification services (2/2 complete)
"alert_processor": "http://alert-processor-service:8000/api/v1/alerts/tenant/{tenant_id}",
"notification": "http://notification-service:8000/api/v1/notifications/tenant/{tenant_id}",
}
def __init__(self, auth_token: Optional[str] = None, db: Optional[AsyncSession] = None):
"""
Initialize orchestrator
Args:
auth_token: JWT token for service-to-service authentication
db: Database session for persistence (optional for backward compatibility)
"""
self.auth_token = auth_token
self.db = db
self.jobs: Dict[str, DeletionJob] = {} # In-memory cache for active jobs
async def _save_job_to_db(self, job: DeletionJob) -> None:
"""Save or update job to database"""
if not self.db:
return
try:
repository = DeletionJobRepository(self.db)
# Check if job exists
existing = await repository.get_by_job_id(job.job_id)
if existing:
# Update existing job
existing.status = job.status.value
existing.service_results = {
name: {
"status": result.status.value,
"deleted_counts": result.deleted_counts,
"total_deleted": result.total_deleted,
"errors": result.errors,
"duration_seconds": result.duration_seconds
}
for name, result in job.service_results.items()
}
existing.total_items_deleted = job.total_items_deleted
existing.services_completed = job.services_completed
existing.services_failed = job.services_failed
existing.error_log = job.error_log
existing.completed_at = datetime.fromisoformat(job.completed_at) if job.completed_at else None
await repository.update(existing)
else:
# Create new job
db_job = DeletionJobModel(
job_id=job.job_id,
tenant_id=UUID(job.tenant_id),
tenant_name=job.tenant_name,
initiated_by=UUID(job.initiated_by) if job.initiated_by else None,
status=job.status.value,
service_results={},
total_items_deleted=0,
services_completed=0,
services_failed=0,
error_log=job.error_log,
started_at=datetime.fromisoformat(job.started_at) if job.started_at else None,
completed_at=None
)
await repository.create(db_job)
await self.db.commit()
except Exception as e:
logger.error("Failed to save job to database", error=str(e), job_id=job.job_id)
# Don't fail the job if database save fails
pass
async def _load_job_from_db(self, job_id: str) -> Optional[DeletionJob]:
"""Load job from database"""
if not self.db:
return None
try:
repository = DeletionJobRepository(self.db)
db_job = await repository.get_by_job_id(job_id)
if not db_job:
return None
# Convert database model to dataclass
job = DeletionJob(
job_id=db_job.job_id,
tenant_id=str(db_job.tenant_id),
tenant_name=db_job.tenant_name,
initiated_by=str(db_job.initiated_by) if db_job.initiated_by else None,
status=DeletionStatus(db_job.status),
started_at=db_job.started_at.isoformat() if db_job.started_at else None,
completed_at=db_job.completed_at.isoformat() if db_job.completed_at else None,
error_log=db_job.error_log or []
)
# Reconstruct service results
if db_job.service_results:
for service_name, result_data in db_job.service_results.items():
job.service_results[service_name] = ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus(result_data["status"]),
deleted_counts=result_data.get("deleted_counts", {}),
errors=result_data.get("errors", []),
duration_seconds=result_data.get("duration_seconds", 0.0)
)
return job
except Exception as e:
logger.error("Failed to load job from database", error=str(e), job_id=job_id)
return None
async def orchestrate_tenant_deletion(
self,
tenant_id: str,
tenant_name: Optional[str] = None,
initiated_by: Optional[str] = None
) -> DeletionJob:
"""
Orchestrate complete tenant deletion across all services
Args:
tenant_id: Tenant to delete
tenant_name: Name of tenant (for logging)
initiated_by: User ID who initiated deletion
Returns:
DeletionJob with complete results
"""
# Create deletion job
job = DeletionJob(
job_id=str(uuid4()),
tenant_id=tenant_id,
tenant_name=tenant_name,
initiated_by=initiated_by,
status=DeletionStatus.IN_PROGRESS,
started_at=datetime.now(timezone.utc).isoformat()
)
self.jobs[job.job_id] = job
# Save initial job to database
await self._save_job_to_db(job)
logger.info("Starting tenant deletion orchestration",
job_id=job.job_id,
tenant_id=tenant_id,
tenant_name=tenant_name,
service_count=len(self.SERVICE_DELETION_ENDPOINTS))
try:
# Delete data from all services in parallel
service_results = await self._delete_from_all_services(tenant_id)
# Store results in job
for service_name, result in service_results.items():
job.service_results[service_name] = result
# Check if all services succeeded
all_succeeded = all(r.success for r in service_results.values())
if all_succeeded:
job.status = DeletionStatus.COMPLETED
logger.info("Tenant deletion orchestration completed successfully",
job_id=job.job_id,
tenant_id=tenant_id,
total_items_deleted=job.total_items_deleted,
services_completed=job.services_completed)
else:
job.status = DeletionStatus.FAILED
failed_services = [name for name, r in service_results.items() if not r.success]
job.error_log.append(f"Failed services: {', '.join(failed_services)}")
logger.error("Tenant deletion orchestration failed",
job_id=job.job_id,
tenant_id=tenant_id,
failed_services=failed_services,
services_completed=job.services_completed,
services_failed=job.services_failed)
job.completed_at = datetime.now(timezone.utc).isoformat()
# Save final state to database
await self._save_job_to_db(job)
except Exception as e:
job.status = DeletionStatus.FAILED
job.error_log.append(f"Fatal orchestration error: {str(e)}")
job.completed_at = datetime.now(timezone.utc).isoformat()
logger.error("Fatal error during tenant deletion orchestration",
job_id=job.job_id,
tenant_id=tenant_id,
error=str(e))
# Save error state to database
await self._save_job_to_db(job)
return job
async def _delete_from_all_services(
self,
tenant_id: str
) -> Dict[str, ServiceDeletionResult]:
"""
Delete tenant data from all services in parallel
Args:
tenant_id: Tenant to delete
Returns:
Dict mapping service name to deletion result
"""
# Create tasks for parallel execution
tasks = []
service_names = []
for service_name, endpoint_template in self.SERVICE_DELETION_ENDPOINTS.items():
endpoint = endpoint_template.format(tenant_id=tenant_id)
task = self._delete_from_service(service_name, endpoint, tenant_id)
tasks.append(task)
service_names.append(service_name)
# Execute all deletions in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Build result dictionary
service_results = {}
for service_name, result in zip(service_names, results):
if isinstance(result, Exception):
# Task raised an exception
service_results[service_name] = ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.FAILED,
errors=[f"Exception: {str(result)}"]
)
else:
service_results[service_name] = result
return service_results
async def _delete_from_service(
self,
service_name: str,
endpoint: str,
tenant_id: str
) -> ServiceDeletionResult:
"""
Delete tenant data from a single service
Args:
service_name: Name of the service
endpoint: Full URL endpoint for deletion
tenant_id: Tenant to delete
Returns:
ServiceDeletionResult with deletion details
"""
start_time = datetime.now(timezone.utc)
logger.info("Calling service deletion endpoint",
service=service_name,
endpoint=endpoint,
tenant_id=tenant_id)
try:
headers = {
"X-Internal-Service": "auth-service",
"Content-Type": "application/json"
}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.delete(endpoint, headers=headers)
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
if response.status_code == 200:
data = response.json()
summary = data.get("summary", {})
result = ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.SUCCESS,
deleted_counts=summary.get("deleted_counts", {}),
errors=summary.get("errors", []),
duration_seconds=duration
)
logger.info("Service deletion succeeded",
service=service_name,
deleted_counts=result.deleted_counts,
total_deleted=result.total_deleted,
duration=duration)
return result
elif response.status_code == 404:
# Service/endpoint doesn't exist yet - not an error
logger.warning("Service deletion endpoint not found (not yet implemented)",
service=service_name,
endpoint=endpoint)
return ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.SUCCESS, # Treat as success
errors=[f"Endpoint not implemented yet: {endpoint}"],
duration_seconds=duration
)
else:
# Deletion failed
error_msg = f"HTTP {response.status_code}: {response.text}"
logger.error("Service deletion failed",
service=service_name,
status_code=response.status_code,
error=error_msg)
return ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.FAILED,
errors=[error_msg],
duration_seconds=duration
)
except httpx.TimeoutException:
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
error_msg = f"Request timeout after {duration}s"
logger.error("Service deletion timeout",
service=service_name,
endpoint=endpoint,
duration=duration)
return ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.FAILED,
errors=[error_msg],
duration_seconds=duration
)
except Exception as e:
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
error_msg = f"Exception: {str(e)}"
logger.error("Service deletion exception",
service=service_name,
endpoint=endpoint,
error=str(e))
return ServiceDeletionResult(
service_name=service_name,
status=ServiceDeletionStatus.FAILED,
errors=[error_msg],
duration_seconds=duration
)
async def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""
Get status of a deletion job
Args:
job_id: Job ID to query
Returns:
Job status dict or None if not found
"""
# Try in-memory cache first
job = self.jobs.get(job_id)
if job:
return job.to_dict()
# Try loading from database
job = await self._load_job_from_db(job_id)
if job:
self.jobs[job_id] = job # Cache it
return job.to_dict()
return None
async def list_jobs(
self,
tenant_id: Optional[str] = None,
status: Optional[DeletionStatus] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
List deletion jobs with optional filters
Args:
tenant_id: Filter by tenant ID
status: Filter by status
limit: Maximum number of jobs to return
Returns:
List of job dicts
"""
# If database is available, load from database
if self.db:
try:
repository = DeletionJobRepository(self.db)
if tenant_id:
db_jobs = await repository.list_by_tenant(
UUID(tenant_id),
status=status.value if status else None,
limit=limit
)
else:
db_jobs = await repository.list_all(
status=status.value if status else None,
limit=limit
)
# Convert to job dicts
jobs = []
for db_job in db_jobs:
job_dict = {
"job_id": db_job.job_id,
"tenant_id": str(db_job.tenant_id),
"tenant_name": db_job.tenant_name,
"initiated_by": str(db_job.initiated_by) if db_job.initiated_by else None,
"status": db_job.status,
"total_items_deleted": db_job.total_items_deleted,
"services_completed": db_job.services_completed,
"services_failed": db_job.services_failed,
"service_results": db_job.service_results or {},
"started_at": db_job.started_at.isoformat() if db_job.started_at else None,
"completed_at": db_job.completed_at.isoformat() if db_job.completed_at else None,
"error_log": db_job.error_log or []
}
jobs.append(job_dict)
return jobs
except Exception as e:
logger.error("Failed to list jobs from database", error=str(e))
# Fall back to in-memory cache
pass
# Fall back to in-memory cache
jobs = list(self.jobs.values())
# Apply filters
if tenant_id:
jobs = [j for j in jobs if j.tenant_id == tenant_id]
if status:
jobs = [j for j in jobs if j.status == status]
# Sort by started_at descending
jobs.sort(key=lambda j: j.started_at or "", reverse=True)
# Apply limit
jobs = jobs[:limit]
return [job.to_dict() for job in jobs]