Files
bakery-ia/services/auth/app/repositories/deletion_job_repository.py

111 lines
3.7 KiB
Python

"""
Deletion Job Repository
Database operations for deletion job persistence
"""
from typing import List, Optional
from uuid import UUID
from sqlalchemy import select, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from app.models.deletion_job import DeletionJob
logger = structlog.get_logger()
class DeletionJobRepository:
"""Repository for deletion job database operations"""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, deletion_job: DeletionJob) -> DeletionJob:
"""Create a new deletion job record"""
try:
self.session.add(deletion_job)
await self.session.flush()
await self.session.refresh(deletion_job)
return deletion_job
except Exception as e:
logger.error("Failed to create deletion job", error=str(e))
raise
async def get_by_job_id(self, job_id: str) -> Optional[DeletionJob]:
"""Get deletion job by job_id"""
try:
query = select(DeletionJob).where(DeletionJob.job_id == job_id)
result = await self.session.execute(query)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Failed to get deletion job", error=str(e), job_id=job_id)
raise
async def get_by_id(self, id: UUID) -> Optional[DeletionJob]:
"""Get deletion job by database ID"""
try:
return await self.session.get(DeletionJob, id)
except Exception as e:
logger.error("Failed to get deletion job by ID", error=str(e), id=str(id))
raise
async def list_by_tenant(
self,
tenant_id: UUID,
status: Optional[str] = None,
limit: int = 100
) -> List[DeletionJob]:
"""List deletion jobs for a tenant"""
try:
query = select(DeletionJob).where(DeletionJob.tenant_id == tenant_id)
if status:
query = query.where(DeletionJob.status == status)
query = query.order_by(desc(DeletionJob.started_at)).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error("Failed to list deletion jobs", error=str(e), tenant_id=str(tenant_id))
raise
async def list_all(
self,
status: Optional[str] = None,
limit: int = 100
) -> List[DeletionJob]:
"""List all deletion jobs with optional status filter"""
try:
query = select(DeletionJob)
if status:
query = query.where(DeletionJob.status == status)
query = query.order_by(desc(DeletionJob.started_at)).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error("Failed to list all deletion jobs", error=str(e))
raise
async def update(self, deletion_job: DeletionJob) -> DeletionJob:
"""Update a deletion job record"""
try:
await self.session.flush()
await self.session.refresh(deletion_job)
return deletion_job
except Exception as e:
logger.error("Failed to update deletion job", error=str(e))
raise
async def delete(self, deletion_job: DeletionJob) -> None:
"""Delete a deletion job record"""
try:
await self.session.delete(deletion_job)
await self.session.flush()
except Exception as e:
logger.error("Failed to delete deletion job", error=str(e))
raise