# services/suppliers/app/repositories/supplier_repository.py """ Supplier repository for database operations """ from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import and_, or_, func from uuid import UUID from datetime import datetime from app.models.suppliers import Supplier, SupplierStatus, SupplierType from app.repositories.base import BaseRepository class SupplierRepository(BaseRepository[Supplier]): """Repository for supplier management operations""" def __init__(self, db: Session): super().__init__(Supplier, db) def get_by_name(self, tenant_id: UUID, name: str) -> Optional[Supplier]: """Get supplier by name within tenant""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.name == name ) ) .first() ) def get_by_supplier_code(self, tenant_id: UUID, supplier_code: str) -> Optional[Supplier]: """Get supplier by supplier code within tenant""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.supplier_code == supplier_code ) ) .first() ) def search_suppliers( self, tenant_id: UUID, search_term: Optional[str] = None, supplier_type: Optional[SupplierType] = None, status: Optional[SupplierStatus] = None, limit: int = 50, offset: int = 0 ) -> List[Supplier]: """Search suppliers with filters""" query = self.db.query(self.model).filter(self.model.tenant_id == tenant_id) # Search term filter (name, contact person, email) if search_term: search_filter = or_( self.model.name.ilike(f"%{search_term}%"), self.model.contact_person.ilike(f"%{search_term}%"), self.model.email.ilike(f"%{search_term}%") ) query = query.filter(search_filter) # Type filter if supplier_type: query = query.filter(self.model.supplier_type == supplier_type) # Status filter if status: query = query.filter(self.model.status == status) return query.order_by(self.model.name).limit(limit).offset(offset).all() def get_active_suppliers(self, tenant_id: UUID) -> List[Supplier]: """Get all active suppliers for a tenant""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE ) ) .order_by(self.model.name) .all() ) def get_suppliers_by_type( self, tenant_id: UUID, supplier_type: SupplierType ) -> List[Supplier]: """Get suppliers by type""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.supplier_type == supplier_type, self.model.status == SupplierStatus.ACTIVE ) ) .order_by(self.model.quality_rating.desc(), self.model.name) .all() ) def get_top_suppliers( self, tenant_id: UUID, limit: int = 10 ) -> List[Supplier]: """Get top suppliers by quality rating and order value""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE ) ) .order_by( self.model.quality_rating.desc(), self.model.total_amount.desc() ) .limit(limit) .all() ) def update_supplier_stats( self, supplier_id: UUID, total_orders_increment: int = 0, total_amount_increment: float = 0.0, new_quality_rating: Optional[float] = None, new_delivery_rating: Optional[float] = None ) -> Optional[Supplier]: """Update supplier performance statistics""" supplier = self.get_by_id(supplier_id) if not supplier: return None # Update counters if total_orders_increment: supplier.total_orders += total_orders_increment if total_amount_increment: supplier.total_amount += total_amount_increment # Update ratings (these should be calculated averages) if new_quality_rating is not None: supplier.quality_rating = new_quality_rating if new_delivery_rating is not None: supplier.delivery_rating = new_delivery_rating supplier.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(supplier) return supplier def get_suppliers_needing_review( self, tenant_id: UUID, days_since_last_order: int = 30 ) -> List[Supplier]: """Get suppliers that may need performance review""" from datetime import datetime, timedelta cutoff_date = datetime.utcnow() - timedelta(days=days_since_last_order) return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE, or_( self.model.quality_rating < 3.0, # Poor rating self.model.delivery_rating < 3.0, # Poor delivery self.model.updated_at < cutoff_date # Long time since interaction ) ) ) .order_by(self.model.quality_rating.asc()) .all() ) def get_supplier_statistics(self, tenant_id: UUID) -> Dict[str, Any]: """Get supplier statistics for dashboard""" total_suppliers = self.count_by_tenant(tenant_id) active_suppliers = ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE ) ) .count() ) pending_suppliers = ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.PENDING_APPROVAL ) ) .count() ) avg_quality_rating = ( self.db.query(func.avg(self.model.quality_rating)) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE, self.model.quality_rating > 0 ) ) .scalar() ) or 0.0 avg_delivery_rating = ( self.db.query(func.avg(self.model.delivery_rating)) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == SupplierStatus.ACTIVE, self.model.delivery_rating > 0 ) ) .scalar() ) or 0.0 total_spend = ( self.db.query(func.sum(self.model.total_amount)) .filter(self.model.tenant_id == tenant_id) .scalar() ) or 0.0 return { "total_suppliers": total_suppliers, "active_suppliers": active_suppliers, "pending_suppliers": pending_suppliers, "avg_quality_rating": round(float(avg_quality_rating), 2), "avg_delivery_rating": round(float(avg_delivery_rating), 2), "total_spend": float(total_spend) } def approve_supplier( self, supplier_id: UUID, approved_by: UUID, approval_date: Optional[datetime] = None ) -> Optional[Supplier]: """Approve a pending supplier""" supplier = self.get_by_id(supplier_id) if not supplier or supplier.status != SupplierStatus.PENDING_APPROVAL: return None supplier.status = SupplierStatus.ACTIVE supplier.approved_by = approved_by supplier.approved_at = approval_date or datetime.utcnow() supplier.rejection_reason = None supplier.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(supplier) return supplier def reject_supplier( self, supplier_id: UUID, rejection_reason: str, approved_by: UUID ) -> Optional[Supplier]: """Reject a pending supplier""" supplier = self.get_by_id(supplier_id) if not supplier or supplier.status != SupplierStatus.PENDING_APPROVAL: return None supplier.status = SupplierStatus.INACTIVE supplier.rejection_reason = rejection_reason supplier.approved_by = approved_by supplier.approved_at = datetime.utcnow() supplier.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(supplier) return supplier