Files
bakery-ia/services/recipes/app/repositories/base.py

96 lines
3.2 KiB
Python
Raw Normal View History

# services/recipes/app/repositories/base.py
"""
Base repository class for common database operations
"""
from typing import TypeVar, Generic, List, Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import desc, asc
from uuid import UUID
T = TypeVar('T')
class BaseRepository(Generic[T]):
"""Base repository with common CRUD operations"""
def __init__(self, model: type, db: Session):
self.model = model
self.db = db
def create(self, obj_data: Dict[str, Any]) -> T:
"""Create a new record"""
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def get_by_id(self, record_id: UUID) -> Optional[T]:
"""Get record by ID"""
return self.db.query(self.model).filter(self.model.id == record_id).first()
def get_by_tenant_id(self, tenant_id: UUID, limit: int = 100, offset: int = 0) -> List[T]:
"""Get records by tenant ID with pagination"""
return (
self.db.query(self.model)
.filter(self.model.tenant_id == tenant_id)
.limit(limit)
.offset(offset)
.all()
)
def update(self, record_id: UUID, update_data: Dict[str, Any]) -> Optional[T]:
"""Update record by ID"""
db_obj = self.get_by_id(record_id)
if db_obj:
for key, value in update_data.items():
if hasattr(db_obj, key):
setattr(db_obj, key, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def delete(self, record_id: UUID) -> bool:
"""Delete record by ID"""
db_obj = self.get_by_id(record_id)
if db_obj:
self.db.delete(db_obj)
self.db.commit()
return True
return False
def count_by_tenant(self, tenant_id: UUID) -> int:
"""Count records by tenant"""
return self.db.query(self.model).filter(self.model.tenant_id == tenant_id).count()
def list_with_filters(
self,
tenant_id: UUID,
filters: Optional[Dict[str, Any]] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
limit: int = 100,
offset: int = 0
) -> List[T]:
"""List records with filtering and sorting"""
query = self.db.query(self.model).filter(self.model.tenant_id == tenant_id)
# Apply filters
if filters:
for key, value in filters.items():
if hasattr(self.model, key) and value is not None:
query = query.filter(getattr(self.model, key) == value)
# Apply sorting
if hasattr(self.model, sort_by):
if sort_order.lower() == "desc":
query = query.order_by(desc(getattr(self.model, sort_by)))
else:
query = query.order_by(asc(getattr(self.model, sort_by)))
return query.limit(limit).offset(offset).all()
def exists(self, record_id: UUID) -> bool:
"""Check if record exists"""
return self.db.query(self.model).filter(self.model.id == record_id).first() is not None