Improve the frontend

This commit is contained in:
Urtzi Alfaro
2025-10-21 19:50:07 +02:00
parent 05da20357d
commit 8d30172483
105 changed files with 14699 additions and 4630 deletions

View File

@@ -1,16 +1,17 @@
# services/suppliers/app/repositories/purchase_order_repository.py
"""
Purchase Order repository for database operations
Purchase Order repository for database operations (Async SQLAlchemy 2.0)
"""
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import select, and_, or_, func, desc
from uuid import UUID
from datetime import datetime, timedelta
from app.models.suppliers import (
PurchaseOrder, PurchaseOrderItem, PurchaseOrderStatus,
PurchaseOrder, PurchaseOrderItem, PurchaseOrderStatus,
Supplier, SupplierPriceList
)
from app.repositories.base import BaseRepository
@@ -18,33 +19,35 @@ from app.repositories.base import BaseRepository
class PurchaseOrderRepository(BaseRepository[PurchaseOrder]):
"""Repository for purchase order management operations"""
def __init__(self, db: Session):
def __init__(self, db: AsyncSession):
super().__init__(PurchaseOrder, db)
def get_by_po_number(self, tenant_id: UUID, po_number: str) -> Optional[PurchaseOrder]:
async def get_by_po_number(self, tenant_id: UUID, po_number: str) -> Optional[PurchaseOrder]:
"""Get purchase order by PO number within tenant"""
return (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.po_number == po_number
)
stmt = select(self.model).filter(
and_(
self.model.tenant_id == tenant_id,
self.model.po_number == po_number
)
.first()
)
def get_with_items(self, po_id: UUID) -> Optional[PurchaseOrder]:
"""Get purchase order with all items loaded"""
return (
self.db.query(self.model)
.options(joinedload(self.model.items))
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_with_items(self, po_id: UUID) -> Optional[PurchaseOrder]:
"""Get purchase order with all items and supplier loaded"""
stmt = (
select(self.model)
.options(
selectinload(self.model.items),
selectinload(self.model.supplier)
)
.filter(self.model.id == po_id)
.first()
)
def search_purchase_orders(
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def search_purchase_orders(
self,
tenant_id: UUID,
supplier_id: Optional[UUID] = None,
@@ -57,320 +60,196 @@ class PurchaseOrderRepository(BaseRepository[PurchaseOrder]):
offset: int = 0
) -> List[PurchaseOrder]:
"""Search purchase orders with comprehensive filters"""
query = (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
stmt = (
select(self.model)
.options(selectinload(self.model.supplier))
.filter(self.model.tenant_id == tenant_id)
)
# Supplier filter
if supplier_id:
query = query.filter(self.model.supplier_id == supplier_id)
stmt = stmt.filter(self.model.supplier_id == supplier_id)
# Status filter
if status:
query = query.filter(self.model.status == status)
stmt = stmt.filter(self.model.status == status)
# Priority filter
if priority:
query = query.filter(self.model.priority == priority)
stmt = stmt.filter(self.model.priority == priority)
# Date range filter
if date_from:
query = query.filter(self.model.order_date >= date_from)
stmt = stmt.filter(self.model.order_date >= date_from)
if date_to:
query = query.filter(self.model.order_date <= date_to)
# Search term filter (PO number, reference, supplier name)
stmt = stmt.filter(self.model.order_date <= date_to)
# Search term filter (PO number, reference)
if search_term:
search_filter = or_(
self.model.po_number.ilike(f"%{search_term}%"),
self.model.reference_number.ilike(f"%{search_term}%"),
self.model.supplier.has(Supplier.name.ilike(f"%{search_term}%"))
self.model.reference_number.ilike(f"%{search_term}%")
)
query = query.filter(search_filter)
return (
query.order_by(desc(self.model.order_date))
.limit(limit)
.offset(offset)
.all()
)
def get_orders_by_status(
self,
tenant_id: UUID,
status: PurchaseOrderStatus
stmt = stmt.filter(search_filter)
stmt = stmt.order_by(desc(self.model.order_date)).limit(limit).offset(offset)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def get_orders_by_status(
self,
tenant_id: UUID,
status: PurchaseOrderStatus,
limit: int = 100
) -> List[PurchaseOrder]:
"""Get all orders with specific status"""
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
"""Get purchase orders by status"""
stmt = (
select(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status == status
)
)
.order_by(self.model.order_date.desc())
.all()
.order_by(desc(self.model.order_date))
.limit(limit)
)
def get_orders_requiring_approval(self, tenant_id: UUID) -> List[PurchaseOrder]:
"""Get orders pending approval"""
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status == PurchaseOrderStatus.PENDING_APPROVAL,
self.model.requires_approval == True
)
)
.order_by(self.model.order_date.asc())
.all()
)
def get_overdue_orders(self, tenant_id: UUID) -> List[PurchaseOrder]:
"""Get orders that are overdue for delivery"""
today = datetime.utcnow().date()
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status.in_([
PurchaseOrderStatus.CONFIRMED,
PurchaseOrderStatus.SENT_TO_SUPPLIER,
PurchaseOrderStatus.PARTIALLY_RECEIVED
]),
self.model.required_delivery_date < today
)
)
.order_by(self.model.required_delivery_date.asc())
.all()
)
def get_orders_by_supplier(
self,
tenant_id: UUID,
supplier_id: UUID,
limit: int = 20
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def get_pending_approval(
self,
tenant_id: UUID,
limit: int = 50
) -> List[PurchaseOrder]:
"""Get recent orders for a specific supplier"""
return (
self.db.query(self.model)
"""Get purchase orders pending approval"""
return await self.get_orders_by_status(
tenant_id,
PurchaseOrderStatus.pending_approval,
limit
)
async def get_by_supplier(
self,
tenant_id: UUID,
supplier_id: UUID,
limit: int = 100,
offset: int = 0
) -> List[PurchaseOrder]:
"""Get purchase orders for a specific supplier"""
stmt = (
select(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.supplier_id == supplier_id
)
)
.order_by(self.model.order_date.desc())
.order_by(desc(self.model.order_date))
.limit(limit)
.all()
.offset(offset)
)
def generate_po_number(self, tenant_id: UUID) -> str:
"""Generate next PO number for tenant"""
# Get current year
current_year = datetime.utcnow().year
# Find highest PO number for current year
year_prefix = f"PO{current_year}"
latest_po = (
self.db.query(self.model.po_number)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.po_number.like(f"{year_prefix}%")
)
)
.order_by(self.model.po_number.desc())
.first()
)
if latest_po:
# Extract number and increment
try:
last_number = int(latest_po.po_number.replace(year_prefix, ""))
new_number = last_number + 1
except ValueError:
new_number = 1
else:
new_number = 1
return f"{year_prefix}{new_number:04d}"
def update_order_status(
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def get_orders_requiring_delivery(
self,
po_id: UUID,
status: PurchaseOrderStatus,
updated_by: UUID,
notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Update purchase order status with audit trail"""
po = self.get_by_id(po_id)
if not po:
return None
po.status = status
po.updated_by = updated_by
po.updated_at = datetime.utcnow()
if notes:
existing_notes = po.internal_notes or ""
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M")
po.internal_notes = f"{existing_notes}\n[{timestamp}] Status changed to {status.value}: {notes}".strip()
# Set specific timestamps based on status
if status == PurchaseOrderStatus.APPROVED:
po.approved_at = datetime.utcnow()
elif status == PurchaseOrderStatus.SENT_TO_SUPPLIER:
po.sent_to_supplier_at = datetime.utcnow()
elif status == PurchaseOrderStatus.CONFIRMED:
po.supplier_confirmation_date = datetime.utcnow()
self.db.commit()
self.db.refresh(po)
return po
def approve_order(
self,
po_id: UUID,
approved_by: UUID,
approval_notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Approve a purchase order"""
po = self.get_by_id(po_id)
if not po or po.status != PurchaseOrderStatus.PENDING_APPROVAL:
return None
po.status = PurchaseOrderStatus.APPROVED
po.approved_by = approved_by
po.approved_at = datetime.utcnow()
po.updated_by = approved_by
po.updated_at = datetime.utcnow()
if approval_notes:
po.internal_notes = (po.internal_notes or "") + f"\nApproval notes: {approval_notes}"
self.db.commit()
self.db.refresh(po)
return po
def calculate_order_totals(self, po_id: UUID) -> Optional[PurchaseOrder]:
"""Recalculate order totals based on line items"""
po = self.get_with_items(po_id)
if not po:
return None
# Calculate subtotal from items
subtotal = sum(item.line_total for item in po.items)
# Keep existing tax, shipping, and discount
tax_amount = po.tax_amount or 0
shipping_cost = po.shipping_cost or 0
discount_amount = po.discount_amount or 0
# Calculate total
total_amount = subtotal + tax_amount + shipping_cost - discount_amount
# Update PO
po.subtotal = subtotal
po.total_amount = max(0, total_amount) # Ensure non-negative
po.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(po)
return po
def get_purchase_order_statistics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get purchase order statistics for dashboard"""
# Total orders
total_orders = self.count_by_tenant(tenant_id)
# Orders by status
status_counts = (
self.db.query(
self.model.status,
func.count(self.model.id).label('count')
)
.filter(self.model.tenant_id == tenant_id)
.group_by(self.model.status)
.all()
)
status_dict = {status.value: 0 for status in PurchaseOrderStatus}
for status, count in status_counts:
status_dict[status.value] = count
# This month's orders
first_day_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
this_month_orders = (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.order_date >= first_day_month
)
)
.count()
)
# Total spend this month
this_month_spend = (
self.db.query(func.sum(self.model.total_amount))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.order_date >= first_day_month,
self.model.status != PurchaseOrderStatus.CANCELLED
)
)
.scalar()
) or 0.0
# Average order value
avg_order_value = (
self.db.query(func.avg(self.model.total_amount))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status != PurchaseOrderStatus.CANCELLED
)
)
.scalar()
) or 0.0
# Overdue orders count
today = datetime.utcnow().date()
overdue_count = (
self.db.query(self.model)
tenant_id: UUID,
days_ahead: int = 7
) -> List[PurchaseOrder]:
"""Get orders expecting delivery within specified days"""
cutoff_date = datetime.utcnow().date() + timedelta(days=days_ahead)
stmt = (
select(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status.in_([
PurchaseOrderStatus.CONFIRMED,
PurchaseOrderStatus.SENT_TO_SUPPLIER,
PurchaseOrderStatus.PARTIALLY_RECEIVED
PurchaseOrderStatus.approved,
PurchaseOrderStatus.sent_to_supplier,
PurchaseOrderStatus.confirmed
]),
self.model.required_delivery_date <= cutoff_date
)
)
.order_by(self.model.required_delivery_date)
)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def get_overdue_deliveries(self, tenant_id: UUID) -> List[PurchaseOrder]:
"""Get purchase orders with overdue deliveries"""
today = datetime.utcnow().date()
stmt = (
select(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status.in_([
PurchaseOrderStatus.approved,
PurchaseOrderStatus.sent_to_supplier,
PurchaseOrderStatus.confirmed
]),
self.model.required_delivery_date < today
)
)
.count()
.order_by(self.model.required_delivery_date)
)
return {
"total_orders": total_orders,
"status_counts": status_dict,
"this_month_orders": this_month_orders,
"this_month_spend": float(this_month_spend),
"avg_order_value": round(float(avg_order_value), 2),
"overdue_count": overdue_count,
"pending_approval": status_dict.get(PurchaseOrderStatus.PENDING_APPROVAL.value, 0)
}
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def get_total_spent_by_supplier(
self,
tenant_id: UUID,
supplier_id: UUID,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None
) -> float:
"""Calculate total amount spent with a supplier"""
stmt = (
select(func.sum(self.model.total_amount))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.supplier_id == supplier_id,
self.model.status.in_([
PurchaseOrderStatus.approved,
PurchaseOrderStatus.sent_to_supplier,
PurchaseOrderStatus.confirmed,
PurchaseOrderStatus.received,
PurchaseOrderStatus.completed
])
)
)
)
if date_from:
stmt = stmt.filter(self.model.order_date >= date_from)
if date_to:
stmt = stmt.filter(self.model.order_date <= date_to)
result = await self.db.execute(stmt)
total = result.scalar()
return float(total) if total else 0.0
async def count_by_status(
self,
tenant_id: UUID,
status: PurchaseOrderStatus
) -> int:
"""Count purchase orders by status"""
stmt = (
select(func.count())
.select_from(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status == status
)
)
)
result = await self.db.execute(stmt)
return result.scalar() or 0

View File

@@ -0,0 +1,376 @@
# services/suppliers/app/repositories/purchase_order_repository.py
"""
Purchase Order repository for database operations
"""
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, func, desc
from uuid import UUID
from datetime import datetime, timedelta
from app.models.suppliers import (
PurchaseOrder, PurchaseOrderItem, PurchaseOrderStatus,
Supplier, SupplierPriceList
)
from app.repositories.base import BaseRepository
class PurchaseOrderRepository(BaseRepository[PurchaseOrder]):
"""Repository for purchase order management operations"""
def __init__(self, db: Session):
super().__init__(PurchaseOrder, db)
def get_by_po_number(self, tenant_id: UUID, po_number: str) -> Optional[PurchaseOrder]:
"""Get purchase order by PO number within tenant"""
return (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.po_number == po_number
)
)
.first()
)
def get_with_items(self, po_id: UUID) -> Optional[PurchaseOrder]:
"""Get purchase order with all items loaded"""
return (
self.db.query(self.model)
.options(joinedload(self.model.items))
.filter(self.model.id == po_id)
.first()
)
def search_purchase_orders(
self,
tenant_id: UUID,
supplier_id: Optional[UUID] = None,
status: Optional[PurchaseOrderStatus] = None,
priority: Optional[str] = None,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None,
search_term: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[PurchaseOrder]:
"""Search purchase orders with comprehensive filters"""
query = (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(self.model.tenant_id == tenant_id)
)
# Supplier filter
if supplier_id:
query = query.filter(self.model.supplier_id == supplier_id)
# Status filter
if status:
query = query.filter(self.model.status == status)
# Priority filter
if priority:
query = query.filter(self.model.priority == priority)
# Date range filter
if date_from:
query = query.filter(self.model.order_date >= date_from)
if date_to:
query = query.filter(self.model.order_date <= date_to)
# Search term filter (PO number, reference, supplier name)
if search_term:
search_filter = or_(
self.model.po_number.ilike(f"%{search_term}%"),
self.model.reference_number.ilike(f"%{search_term}%"),
self.model.supplier.has(Supplier.name.ilike(f"%{search_term}%"))
)
query = query.filter(search_filter)
return (
query.order_by(desc(self.model.order_date))
.limit(limit)
.offset(offset)
.all()
)
def get_orders_by_status(
self,
tenant_id: UUID,
status: PurchaseOrderStatus
) -> List[PurchaseOrder]:
"""Get all orders with specific status"""
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status == status
)
)
.order_by(self.model.order_date.desc())
.all()
)
def get_orders_requiring_approval(self, tenant_id: UUID) -> List[PurchaseOrder]:
"""Get orders pending approval"""
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status == PurchaseOrderStatus.PENDING_APPROVAL,
self.model.requires_approval == True
)
)
.order_by(self.model.order_date.asc())
.all()
)
def get_overdue_orders(self, tenant_id: UUID) -> List[PurchaseOrder]:
"""Get orders that are overdue for delivery"""
today = datetime.utcnow().date()
return (
self.db.query(self.model)
.options(joinedload(self.model.supplier))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status.in_([
PurchaseOrderStatus.CONFIRMED,
PurchaseOrderStatus.SENT_TO_SUPPLIER,
PurchaseOrderStatus.PARTIALLY_RECEIVED
]),
self.model.required_delivery_date < today
)
)
.order_by(self.model.required_delivery_date.asc())
.all()
)
def get_orders_by_supplier(
self,
tenant_id: UUID,
supplier_id: UUID,
limit: int = 20
) -> List[PurchaseOrder]:
"""Get recent orders for a specific supplier"""
return (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.supplier_id == supplier_id
)
)
.order_by(self.model.order_date.desc())
.limit(limit)
.all()
)
def generate_po_number(self, tenant_id: UUID) -> str:
"""Generate next PO number for tenant"""
# Get current year
current_year = datetime.utcnow().year
# Find highest PO number for current year
year_prefix = f"PO{current_year}"
latest_po = (
self.db.query(self.model.po_number)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.po_number.like(f"{year_prefix}%")
)
)
.order_by(self.model.po_number.desc())
.first()
)
if latest_po:
# Extract number and increment
try:
last_number = int(latest_po.po_number.replace(year_prefix, ""))
new_number = last_number + 1
except ValueError:
new_number = 1
else:
new_number = 1
return f"{year_prefix}{new_number:04d}"
def update_order_status(
self,
po_id: UUID,
status: PurchaseOrderStatus,
updated_by: UUID,
notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Update purchase order status with audit trail"""
po = self.get_by_id(po_id)
if not po:
return None
po.status = status
po.updated_by = updated_by
po.updated_at = datetime.utcnow()
if notes:
existing_notes = po.internal_notes or ""
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M")
po.internal_notes = f"{existing_notes}\n[{timestamp}] Status changed to {status.value}: {notes}".strip()
# Set specific timestamps based on status
if status == PurchaseOrderStatus.APPROVED:
po.approved_at = datetime.utcnow()
elif status == PurchaseOrderStatus.SENT_TO_SUPPLIER:
po.sent_to_supplier_at = datetime.utcnow()
elif status == PurchaseOrderStatus.CONFIRMED:
po.supplier_confirmation_date = datetime.utcnow()
self.db.commit()
self.db.refresh(po)
return po
def approve_order(
self,
po_id: UUID,
approved_by: UUID,
approval_notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Approve a purchase order"""
po = self.get_by_id(po_id)
if not po or po.status != PurchaseOrderStatus.PENDING_APPROVAL:
return None
po.status = PurchaseOrderStatus.APPROVED
po.approved_by = approved_by
po.approved_at = datetime.utcnow()
po.updated_by = approved_by
po.updated_at = datetime.utcnow()
if approval_notes:
po.internal_notes = (po.internal_notes or "") + f"\nApproval notes: {approval_notes}"
self.db.commit()
self.db.refresh(po)
return po
def calculate_order_totals(self, po_id: UUID) -> Optional[PurchaseOrder]:
"""Recalculate order totals based on line items"""
po = self.get_with_items(po_id)
if not po:
return None
# Calculate subtotal from items
subtotal = sum(item.line_total for item in po.items)
# Keep existing tax, shipping, and discount
tax_amount = po.tax_amount or 0
shipping_cost = po.shipping_cost or 0
discount_amount = po.discount_amount or 0
# Calculate total
total_amount = subtotal + tax_amount + shipping_cost - discount_amount
# Update PO
po.subtotal = subtotal
po.total_amount = max(0, total_amount) # Ensure non-negative
po.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(po)
return po
def get_purchase_order_statistics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get purchase order statistics for dashboard"""
# Total orders
total_orders = self.count_by_tenant(tenant_id)
# Orders by status
status_counts = (
self.db.query(
self.model.status,
func.count(self.model.id).label('count')
)
.filter(self.model.tenant_id == tenant_id)
.group_by(self.model.status)
.all()
)
status_dict = {status.value: 0 for status in PurchaseOrderStatus}
for status, count in status_counts:
status_dict[status.value] = count
# This month's orders
first_day_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
this_month_orders = (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.order_date >= first_day_month
)
)
.count()
)
# Total spend this month
this_month_spend = (
self.db.query(func.sum(self.model.total_amount))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.order_date >= first_day_month,
self.model.status != PurchaseOrderStatus.CANCELLED
)
)
.scalar()
) or 0.0
# Average order value
avg_order_value = (
self.db.query(func.avg(self.model.total_amount))
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status != PurchaseOrderStatus.CANCELLED
)
)
.scalar()
) or 0.0
# Overdue orders count
today = datetime.utcnow().date()
overdue_count = (
self.db.query(self.model)
.filter(
and_(
self.model.tenant_id == tenant_id,
self.model.status.in_([
PurchaseOrderStatus.CONFIRMED,
PurchaseOrderStatus.SENT_TO_SUPPLIER,
PurchaseOrderStatus.PARTIALLY_RECEIVED
]),
self.model.required_delivery_date < today
)
)
.count()
)
return {
"total_orders": total_orders,
"status_counts": status_dict,
"this_month_orders": this_month_orders,
"this_month_spend": float(this_month_spend),
"avg_order_value": round(float(avg_order_value), 2),
"overdue_count": overdue_count,
"pending_approval": status_dict.get(PurchaseOrderStatus.PENDING_APPROVAL.value, 0)
}

View File

@@ -0,0 +1,289 @@
# services/suppliers/app/repositories/supplier_performance_repository.py
"""
Supplier Performance Repository - Calculate and manage supplier trust scores
Handles supplier performance metrics, trust scores, and auto-approval eligibility
"""
from typing import Optional, Dict, Any
from uuid import UUID
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, case
from sqlalchemy.orm import selectinload
from app.models.suppliers import (
Supplier,
PurchaseOrder,
PurchaseOrderStatus
)
logger = structlog.get_logger()
class SupplierPerformanceRepository:
"""Repository for calculating and managing supplier performance metrics"""
def __init__(self, db: AsyncSession):
self.db = db
async def calculate_trust_score(self, supplier_id: UUID) -> float:
"""
Calculate comprehensive trust score for a supplier
Score components (weighted):
- Quality rating: 30%
- Delivery rating: 30%
- On-time delivery rate: 20%
- Fulfillment rate: 15%
- Order history: 5%
Returns:
float: Trust score between 0.0 and 1.0
"""
try:
# Get supplier with current metrics
stmt = select(Supplier).where(Supplier.id == supplier_id)
result = await self.db.execute(stmt)
supplier = result.scalar_one_or_none()
if not supplier:
logger.warning("Supplier not found for trust score calculation", supplier_id=str(supplier_id))
return 0.0
# Calculate on-time delivery rate from recent POs
on_time_rate = await self._calculate_on_time_delivery_rate(supplier_id)
# Calculate fulfillment rate from recent POs
fulfillment_rate = await self._calculate_fulfillment_rate(supplier_id)
# Calculate order history score (more orders = higher confidence)
order_history_score = min(1.0, supplier.total_pos_count / 50.0)
# Weighted components
quality_score = (supplier.quality_rating or 0.0) / 5.0 # Normalize to 0-1
delivery_score = (supplier.delivery_rating or 0.0) / 5.0 # Normalize to 0-1
trust_score = (
quality_score * 0.30 +
delivery_score * 0.30 +
on_time_rate * 0.20 +
fulfillment_rate * 0.15 +
order_history_score * 0.05
)
# Ensure score is between 0 and 1
trust_score = max(0.0, min(1.0, trust_score))
logger.info(
"Trust score calculated",
supplier_id=str(supplier_id),
trust_score=trust_score,
quality_score=quality_score,
delivery_score=delivery_score,
on_time_rate=on_time_rate,
fulfillment_rate=fulfillment_rate,
order_history_score=order_history_score
)
return trust_score
except Exception as e:
logger.error("Error calculating trust score", supplier_id=str(supplier_id), error=str(e))
return 0.0
async def _calculate_on_time_delivery_rate(self, supplier_id: UUID, days: int = 90) -> float:
"""Calculate percentage of orders delivered on time in the last N days"""
try:
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
# Get completed orders with delivery dates
stmt = select(
func.count(PurchaseOrder.id).label('total_orders'),
func.count(
case(
(PurchaseOrder.actual_delivery_date <= PurchaseOrder.required_delivery_date, 1)
)
).label('on_time_orders')
).where(
and_(
PurchaseOrder.supplier_id == supplier_id,
PurchaseOrder.status == PurchaseOrderStatus.completed,
PurchaseOrder.actual_delivery_date.isnot(None),
PurchaseOrder.created_at >= cutoff_date
)
)
result = await self.db.execute(stmt)
row = result.one()
if row.total_orders == 0:
return 0.0
on_time_rate = float(row.on_time_orders) / float(row.total_orders)
return on_time_rate
except Exception as e:
logger.error("Error calculating on-time delivery rate", supplier_id=str(supplier_id), error=str(e))
return 0.0
async def _calculate_fulfillment_rate(self, supplier_id: UUID, days: int = 90) -> float:
"""Calculate percentage of orders fully fulfilled (no shortages) in the last N days"""
try:
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
# Get completed/confirmed orders
stmt = select(
func.count(PurchaseOrder.id).label('total_orders'),
func.count(
case(
(PurchaseOrder.status == PurchaseOrderStatus.completed, 1)
)
).label('completed_orders')
).where(
and_(
PurchaseOrder.supplier_id == supplier_id,
PurchaseOrder.status.in_([
PurchaseOrderStatus.completed,
PurchaseOrderStatus.partially_received
]),
PurchaseOrder.created_at >= cutoff_date
)
)
result = await self.db.execute(stmt)
row = result.one()
if row.total_orders == 0:
return 0.0
fulfillment_rate = float(row.completed_orders) / float(row.total_orders)
return fulfillment_rate
except Exception as e:
logger.error("Error calculating fulfillment rate", supplier_id=str(supplier_id), error=str(e))
return 0.0
async def update_supplier_performance_metrics(self, supplier_id: UUID) -> Dict[str, Any]:
"""
Update all performance metrics for a supplier
Returns:
Dict with updated metrics
"""
try:
# Calculate all metrics
trust_score = await self.calculate_trust_score(supplier_id)
on_time_rate = await self._calculate_on_time_delivery_rate(supplier_id)
fulfillment_rate = await self._calculate_fulfillment_rate(supplier_id)
# Get current supplier
stmt = select(Supplier).where(Supplier.id == supplier_id)
result = await self.db.execute(stmt)
supplier = result.scalar_one_or_none()
if not supplier:
return {}
# Update supplier metrics
supplier.trust_score = trust_score
supplier.on_time_delivery_rate = on_time_rate
supplier.fulfillment_rate = fulfillment_rate
supplier.last_performance_update = datetime.now(timezone.utc)
# Auto-update preferred status based on performance
supplier.is_preferred_supplier = (
supplier.total_pos_count >= 10 and
trust_score >= 0.80 and
supplier.status.value == 'active'
)
# Auto-update auto-approve eligibility
supplier.auto_approve_enabled = (
supplier.total_pos_count >= 20 and
trust_score >= 0.85 and
on_time_rate >= 0.90 and
supplier.is_preferred_supplier and
supplier.status.value == 'active'
)
await self.db.commit()
logger.info(
"Supplier performance metrics updated",
supplier_id=str(supplier_id),
trust_score=trust_score,
is_preferred=supplier.is_preferred_supplier,
auto_approve_enabled=supplier.auto_approve_enabled
)
return {
"supplier_id": str(supplier_id),
"trust_score": trust_score,
"on_time_delivery_rate": on_time_rate,
"fulfillment_rate": fulfillment_rate,
"is_preferred_supplier": supplier.is_preferred_supplier,
"auto_approve_enabled": supplier.auto_approve_enabled,
"last_updated": supplier.last_performance_update.isoformat()
}
except Exception as e:
await self.db.rollback()
logger.error("Error updating supplier performance metrics", supplier_id=str(supplier_id), error=str(e))
raise
async def increment_po_counters(self, supplier_id: UUID, approved: bool = False):
"""Increment PO counters when a new PO is created or approved"""
try:
stmt = select(Supplier).where(Supplier.id == supplier_id)
result = await self.db.execute(stmt)
supplier = result.scalar_one_or_none()
if supplier:
supplier.total_pos_count += 1
if approved:
supplier.approved_pos_count += 1
await self.db.commit()
logger.info(
"Supplier PO counters incremented",
supplier_id=str(supplier_id),
total=supplier.total_pos_count,
approved=supplier.approved_pos_count
)
except Exception as e:
await self.db.rollback()
logger.error("Error incrementing PO counters", supplier_id=str(supplier_id), error=str(e))
async def get_supplier_with_performance(self, supplier_id: UUID) -> Optional[Dict[str, Any]]:
"""Get supplier data with all performance metrics"""
try:
stmt = select(Supplier).where(Supplier.id == supplier_id)
result = await self.db.execute(stmt)
supplier = result.scalar_one_or_none()
if not supplier:
return None
return {
"id": str(supplier.id),
"name": supplier.name,
"trust_score": supplier.trust_score,
"is_preferred_supplier": supplier.is_preferred_supplier,
"auto_approve_enabled": supplier.auto_approve_enabled,
"total_pos_count": supplier.total_pos_count,
"approved_pos_count": supplier.approved_pos_count,
"on_time_delivery_rate": supplier.on_time_delivery_rate,
"fulfillment_rate": supplier.fulfillment_rate,
"quality_rating": supplier.quality_rating,
"delivery_rating": supplier.delivery_rating,
"status": supplier.status.value if supplier.status else None,
"last_performance_update": supplier.last_performance_update.isoformat() if supplier.last_performance_update else None
}
except Exception as e:
logger.error("Error getting supplier with performance", supplier_id=str(supplier_id), error=str(e))
return None