# services/suppliers/app/repositories/delivery_repository.py """ Delivery repository for database operations """ from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session, joinedload from sqlalchemy import and_, or_, func, desc from uuid import UUID from datetime import datetime, timedelta from app.models.suppliers import ( Delivery, DeliveryItem, DeliveryStatus, PurchaseOrder, Supplier ) from app.repositories.base import BaseRepository class DeliveryRepository(BaseRepository[Delivery]): """Repository for delivery tracking operations""" def __init__(self, db: Session): super().__init__(Delivery, db) def get_by_delivery_number( self, tenant_id: UUID, delivery_number: str ) -> Optional[Delivery]: """Get delivery by delivery number within tenant""" return ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.delivery_number == delivery_number ) ) .first() ) def get_with_items(self, delivery_id: UUID) -> Optional[Delivery]: """Get delivery with all items loaded""" return ( self.db.query(self.model) .options( joinedload(self.model.items), joinedload(self.model.purchase_order), joinedload(self.model.supplier) ) .filter(self.model.id == delivery_id) .first() ) def get_by_purchase_order(self, po_id: UUID) -> List[Delivery]: """Get all deliveries for a purchase order""" return ( self.db.query(self.model) .options(joinedload(self.model.items)) .filter(self.model.purchase_order_id == po_id) .order_by(self.model.scheduled_date.desc()) .all() ) def search_deliveries( self, tenant_id: UUID, supplier_id: Optional[UUID] = None, status: Optional[DeliveryStatus] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, search_term: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> List[Delivery]: """Search deliveries with comprehensive filters""" query = ( self.db.query(self.model) .options( joinedload(self.model.supplier), joinedload(self.model.purchase_order) ) .filter(self.model.tenant_id == tenant_id) ) # Supplier filter if supplier_id: query = query.filter(self.model.supplier_id == supplier_id) # Status filter if status: query = query.filter(self.model.status == status) # Date range filter (scheduled date) if date_from: query = query.filter(self.model.scheduled_date >= date_from) if date_to: query = query.filter(self.model.scheduled_date <= date_to) # Search term filter if search_term: search_filter = or_( self.model.delivery_number.ilike(f"%{search_term}%"), self.model.supplier_delivery_note.ilike(f"%{search_term}%"), self.model.tracking_number.ilike(f"%{search_term}%"), self.model.purchase_order.has(PurchaseOrder.po_number.ilike(f"%{search_term}%")) ) query = query.filter(search_filter) return ( query.order_by(desc(self.model.scheduled_date)) .limit(limit) .offset(offset) .all() ) def get_scheduled_deliveries( self, tenant_id: UUID, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None ) -> List[Delivery]: """Get scheduled deliveries for a date range""" if not date_from: date_from = datetime.utcnow() if not date_to: date_to = date_from + timedelta(days=7) # Next week return ( self.db.query(self.model) .options( joinedload(self.model.supplier), joinedload(self.model.purchase_order) ) .filter( and_( self.model.tenant_id == tenant_id, self.model.status.in_([ DeliveryStatus.SCHEDULED, DeliveryStatus.IN_TRANSIT, DeliveryStatus.OUT_FOR_DELIVERY ]), self.model.scheduled_date >= date_from, self.model.scheduled_date <= date_to ) ) .order_by(self.model.scheduled_date) .all() ) def get_todays_deliveries(self, tenant_id: UUID) -> List[Delivery]: """Get deliveries scheduled for today""" today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) today_end = today_start + timedelta(days=1) return self.get_scheduled_deliveries(tenant_id, today_start, today_end) def get_overdue_deliveries(self, tenant_id: UUID) -> List[Delivery]: """Get deliveries that are overdue (scheduled in the past but not completed)""" now = datetime.utcnow() return ( self.db.query(self.model) .options( joinedload(self.model.supplier), joinedload(self.model.purchase_order) ) .filter( and_( self.model.tenant_id == tenant_id, self.model.status.in_([ DeliveryStatus.SCHEDULED, DeliveryStatus.IN_TRANSIT, DeliveryStatus.OUT_FOR_DELIVERY ]), self.model.scheduled_date < now ) ) .order_by(self.model.scheduled_date) .all() ) def generate_delivery_number(self, tenant_id: UUID) -> str: """Generate next delivery number for tenant""" # Get current date today = datetime.utcnow() date_prefix = f"DEL{today.strftime('%Y%m%d')}" # Find highest delivery number for today latest_delivery = ( self.db.query(self.model.delivery_number) .filter( and_( self.model.tenant_id == tenant_id, self.model.delivery_number.like(f"{date_prefix}%") ) ) .order_by(self.model.delivery_number.desc()) .first() ) if latest_delivery: try: last_number = int(latest_delivery.delivery_number.replace(date_prefix, "")) new_number = last_number + 1 except ValueError: new_number = 1 else: new_number = 1 return f"{date_prefix}{new_number:03d}" def update_delivery_status( self, delivery_id: UUID, status: DeliveryStatus, updated_by: UUID, notes: Optional[str] = None, update_timestamps: bool = True ) -> Optional[Delivery]: """Update delivery status with appropriate timestamps""" delivery = self.get_by_id(delivery_id) if not delivery: return None old_status = delivery.status delivery.status = status delivery.created_by = updated_by # Track who updated if update_timestamps: now = datetime.utcnow() if status == DeliveryStatus.IN_TRANSIT and not delivery.estimated_arrival: # Set estimated arrival if not already set delivery.estimated_arrival = now + timedelta(hours=4) # Default 4 hours elif status == DeliveryStatus.DELIVERED: delivery.actual_arrival = now delivery.completed_at = now elif status == DeliveryStatus.PARTIALLY_DELIVERED: delivery.actual_arrival = now elif status == DeliveryStatus.FAILED_DELIVERY: delivery.actual_arrival = now # Add status change note if notes: existing_notes = delivery.notes or "" timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M") status_note = f"[{timestamp}] Status: {old_status.value} → {status.value}: {notes}" delivery.notes = f"{existing_notes}\n{status_note}".strip() delivery.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(delivery) return delivery def mark_as_received( self, delivery_id: UUID, received_by: UUID, inspection_passed: bool = True, inspection_notes: Optional[str] = None, quality_issues: Optional[Dict[str, Any]] = None ) -> Optional[Delivery]: """Mark delivery as received with inspection details""" delivery = self.get_by_id(delivery_id) if not delivery: return None delivery.status = DeliveryStatus.DELIVERED delivery.received_by = received_by delivery.received_at = datetime.utcnow() delivery.completed_at = datetime.utcnow() delivery.inspection_passed = inspection_passed if inspection_notes: delivery.inspection_notes = inspection_notes if quality_issues: delivery.quality_issues = quality_issues if not delivery.actual_arrival: delivery.actual_arrival = datetime.utcnow() delivery.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(delivery) return delivery def get_delivery_performance_stats( self, tenant_id: UUID, days_back: int = 30, supplier_id: Optional[UUID] = None ) -> Dict[str, Any]: """Get delivery performance statistics""" cutoff_date = datetime.utcnow() - timedelta(days=days_back) query = ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.created_at >= cutoff_date ) ) ) if supplier_id: query = query.filter(self.model.supplier_id == supplier_id) deliveries = query.all() if not deliveries: return { "total_deliveries": 0, "on_time_deliveries": 0, "late_deliveries": 0, "failed_deliveries": 0, "on_time_percentage": 0.0, "avg_delay_hours": 0.0, "quality_pass_rate": 0.0 } total_count = len(deliveries) on_time_count = 0 late_count = 0 failed_count = 0 total_delay_hours = 0 quality_pass_count = 0 quality_total = 0 for delivery in deliveries: if delivery.status == DeliveryStatus.FAILED_DELIVERY: failed_count += 1 continue # Check if delivery was on time if delivery.scheduled_date and delivery.actual_arrival: if delivery.actual_arrival <= delivery.scheduled_date: on_time_count += 1 else: late_count += 1 delay = delivery.actual_arrival - delivery.scheduled_date total_delay_hours += delay.total_seconds() / 3600 # Check quality inspection if delivery.inspection_passed is not None: quality_total += 1 if delivery.inspection_passed: quality_pass_count += 1 on_time_percentage = (on_time_count / total_count * 100) if total_count > 0 else 0 avg_delay_hours = total_delay_hours / late_count if late_count > 0 else 0 quality_pass_rate = (quality_pass_count / quality_total * 100) if quality_total > 0 else 0 return { "total_deliveries": total_count, "on_time_deliveries": on_time_count, "late_deliveries": late_count, "failed_deliveries": failed_count, "on_time_percentage": round(on_time_percentage, 1), "avg_delay_hours": round(avg_delay_hours, 1), "quality_pass_rate": round(quality_pass_rate, 1) } def get_upcoming_deliveries_summary(self, tenant_id: UUID) -> Dict[str, Any]: """Get summary of upcoming deliveries""" today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) next_week = today + timedelta(days=7) # Today's deliveries todays_deliveries = len(self.get_todays_deliveries(tenant_id)) # This week's deliveries this_week_deliveries = ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.scheduled_date >= today, self.model.scheduled_date <= next_week, self.model.status.in_([ DeliveryStatus.SCHEDULED, DeliveryStatus.IN_TRANSIT, DeliveryStatus.OUT_FOR_DELIVERY ]) ) ) .count() ) # Overdue deliveries overdue_count = len(self.get_overdue_deliveries(tenant_id)) # In transit deliveries in_transit_count = ( self.db.query(self.model) .filter( and_( self.model.tenant_id == tenant_id, self.model.status == DeliveryStatus.IN_TRANSIT ) ) .count() ) return { "todays_deliveries": todays_deliveries, "this_week_deliveries": this_week_deliveries, "overdue_deliveries": overdue_count, "in_transit_deliveries": in_transit_count }