Files
bakery-ia/services/auth/app/repositories/deletion_job_repository.py

111 lines
3.7 KiB
Python
Raw Permalink Normal View History

"""
Deletion Job Repository
Database operations for deletion job persistence
"""
from typing import List, Optional
from uuid import UUID
from sqlalchemy import select, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from app.models.deletion_job import DeletionJob
logger = structlog.get_logger()
class DeletionJobRepository:
"""Repository for deletion job database operations"""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, deletion_job: DeletionJob) -> DeletionJob:
"""Create a new deletion job record"""
try:
self.session.add(deletion_job)
await self.session.flush()
await self.session.refresh(deletion_job)
return deletion_job
except Exception as e:
logger.error("Failed to create deletion job", error=str(e))
raise
async def get_by_job_id(self, job_id: str) -> Optional[DeletionJob]:
"""Get deletion job by job_id"""
try:
query = select(DeletionJob).where(DeletionJob.job_id == job_id)
result = await self.session.execute(query)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Failed to get deletion job", error=str(e), job_id=job_id)
raise
async def get_by_id(self, id: UUID) -> Optional[DeletionJob]:
"""Get deletion job by database ID"""
try:
return await self.session.get(DeletionJob, id)
except Exception as e:
logger.error("Failed to get deletion job by ID", error=str(e), id=str(id))
raise
async def list_by_tenant(
self,
tenant_id: UUID,
status: Optional[str] = None,
limit: int = 100
) -> List[DeletionJob]:
"""List deletion jobs for a tenant"""
try:
query = select(DeletionJob).where(DeletionJob.tenant_id == tenant_id)
if status:
query = query.where(DeletionJob.status == status)
query = query.order_by(desc(DeletionJob.started_at)).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error("Failed to list deletion jobs", error=str(e), tenant_id=str(tenant_id))
raise
async def list_all(
self,
status: Optional[str] = None,
limit: int = 100
) -> List[DeletionJob]:
"""List all deletion jobs with optional status filter"""
try:
query = select(DeletionJob)
if status:
query = query.where(DeletionJob.status == status)
query = query.order_by(desc(DeletionJob.started_at)).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error("Failed to list all deletion jobs", error=str(e))
raise
async def update(self, deletion_job: DeletionJob) -> DeletionJob:
"""Update a deletion job record"""
try:
await self.session.flush()
await self.session.refresh(deletion_job)
return deletion_job
except Exception as e:
logger.error("Failed to update deletion job", error=str(e))
raise
async def delete(self, deletion_job: DeletionJob) -> None:
"""Delete a deletion job record"""
try:
await self.session.delete(deletion_job)
await self.session.flush()
except Exception as e:
logger.error("Failed to delete deletion job", error=str(e))
raise