# services/suppliers/app/services/supplier_service.py """ Supplier service for business logic operations """ import structlog from typing import List, Optional, Dict, Any from uuid import UUID from datetime import datetime from sqlalchemy.orm import Session from app.repositories.supplier_repository import SupplierRepository from app.models.suppliers import Supplier, SupplierStatus, SupplierType from app.schemas.suppliers import ( SupplierCreate, SupplierUpdate, SupplierResponse, SupplierSearchParams, SupplierStatistics ) from app.core.config import settings logger = structlog.get_logger() class SupplierService: """Service for supplier management operations""" def __init__(self, db: Session): self.db = db self.repository = SupplierRepository(db) async def create_supplier( self, tenant_id: UUID, supplier_data: SupplierCreate, created_by: UUID ) -> Supplier: """Create a new supplier""" logger.info("Creating supplier", tenant_id=str(tenant_id), name=supplier_data.name) # Check for duplicate name existing = self.repository.get_by_name(tenant_id, supplier_data.name) if existing: raise ValueError(f"Supplier with name '{supplier_data.name}' already exists") # Check for duplicate supplier code if provided if supplier_data.supplier_code: existing_code = self.repository.get_by_supplier_code( tenant_id, supplier_data.supplier_code ) if existing_code: raise ValueError( f"Supplier with code '{supplier_data.supplier_code}' already exists" ) # Generate supplier code if not provided supplier_code = supplier_data.supplier_code if not supplier_code: supplier_code = self._generate_supplier_code(supplier_data.name) # Create supplier data create_data = supplier_data.model_dump(exclude_unset=True) create_data.update({ 'tenant_id': tenant_id, 'supplier_code': supplier_code, 'status': SupplierStatus.PENDING_APPROVAL, 'created_by': created_by, 'updated_by': created_by, 'quality_rating': 0.0, 'delivery_rating': 0.0, 'total_orders': 0, 'total_amount': 0.0 }) supplier = self.repository.create(create_data) logger.info( "Supplier created successfully", tenant_id=str(tenant_id), supplier_id=str(supplier.id), name=supplier.name ) return supplier async def get_supplier(self, supplier_id: UUID) -> Optional[Supplier]: """Get supplier by ID""" return self.repository.get_by_id(supplier_id) async def update_supplier( self, supplier_id: UUID, supplier_data: SupplierUpdate, updated_by: UUID ) -> Optional[Supplier]: """Update supplier information""" logger.info("Updating supplier", supplier_id=str(supplier_id)) supplier = self.repository.get_by_id(supplier_id) if not supplier: return None # Check for duplicate name if changing if supplier_data.name and supplier_data.name != supplier.name: existing = self.repository.get_by_name(supplier.tenant_id, supplier_data.name) if existing: raise ValueError(f"Supplier with name '{supplier_data.name}' already exists") # Check for duplicate supplier code if changing if (supplier_data.supplier_code and supplier_data.supplier_code != supplier.supplier_code): existing_code = self.repository.get_by_supplier_code( supplier.tenant_id, supplier_data.supplier_code ) if existing_code: raise ValueError( f"Supplier with code '{supplier_data.supplier_code}' already exists" ) # Prepare update data update_data = supplier_data.model_dump(exclude_unset=True) update_data['updated_by'] = updated_by update_data['updated_at'] = datetime.utcnow() supplier = self.repository.update(supplier_id, update_data) logger.info("Supplier updated successfully", supplier_id=str(supplier_id)) return supplier async def delete_supplier(self, supplier_id: UUID) -> bool: """Delete supplier (soft delete by changing status)""" logger.info("Deleting supplier", supplier_id=str(supplier_id)) supplier = self.repository.get_by_id(supplier_id) if not supplier: return False # Check if supplier has active purchase orders # TODO: Add check for active purchase orders once PO service is implemented # Soft delete by changing status self.repository.update(supplier_id, { 'status': SupplierStatus.INACTIVE, 'updated_at': datetime.utcnow() }) logger.info("Supplier deleted successfully", supplier_id=str(supplier_id)) return True async def search_suppliers( self, tenant_id: UUID, search_params: SupplierSearchParams ) -> List[Supplier]: """Search suppliers with filters""" return self.repository.search_suppliers( tenant_id=tenant_id, search_term=search_params.search_term, supplier_type=search_params.supplier_type, status=search_params.status, limit=search_params.limit, offset=search_params.offset ) async def get_active_suppliers(self, tenant_id: UUID) -> List[Supplier]: """Get all active suppliers""" return self.repository.get_active_suppliers(tenant_id) async def get_suppliers_by_type( self, tenant_id: UUID, supplier_type: SupplierType ) -> List[Supplier]: """Get suppliers by type""" return self.repository.get_suppliers_by_type(tenant_id, supplier_type) async def get_top_suppliers(self, tenant_id: UUID, limit: int = 10) -> List[Supplier]: """Get top performing suppliers""" return self.repository.get_top_suppliers(tenant_id, limit) async def approve_supplier( self, supplier_id: UUID, approved_by: UUID, notes: Optional[str] = None ) -> Optional[Supplier]: """Approve a pending supplier""" logger.info("Approving supplier", supplier_id=str(supplier_id)) supplier = self.repository.approve_supplier(supplier_id, approved_by) if not supplier: logger.warning("Failed to approve supplier - not found or not pending") return None if notes: self.repository.update(supplier_id, { 'notes': (supplier.notes or "") + f"\nApproval notes: {notes}", 'updated_at': datetime.utcnow() }) logger.info("Supplier approved successfully", supplier_id=str(supplier_id)) return supplier async def reject_supplier( self, supplier_id: UUID, rejection_reason: str, rejected_by: UUID ) -> Optional[Supplier]: """Reject a pending supplier""" logger.info("Rejecting supplier", supplier_id=str(supplier_id)) supplier = self.repository.reject_supplier( supplier_id, rejection_reason, rejected_by ) if not supplier: logger.warning("Failed to reject supplier - not found or not pending") return None logger.info("Supplier rejected successfully", supplier_id=str(supplier_id)) return supplier async def update_supplier_performance( self, supplier_id: UUID, quality_rating: Optional[float] = None, delivery_rating: Optional[float] = None, order_increment: int = 0, amount_increment: float = 0.0 ) -> Optional[Supplier]: """Update supplier performance metrics""" logger.info("Updating supplier performance", supplier_id=str(supplier_id)) return self.repository.update_supplier_stats( supplier_id=supplier_id, total_orders_increment=order_increment, total_amount_increment=amount_increment, new_quality_rating=quality_rating, new_delivery_rating=delivery_rating ) async def get_supplier_statistics(self, tenant_id: UUID) -> Dict[str, Any]: """Get supplier statistics for dashboard""" return self.repository.get_supplier_statistics(tenant_id) async def get_suppliers_needing_review( self, tenant_id: UUID, days_since_last_order: int = 30 ) -> List[Supplier]: """Get suppliers that may need performance review""" return self.repository.get_suppliers_needing_review( tenant_id, days_since_last_order ) def _generate_supplier_code(self, supplier_name: str) -> str: """Generate supplier code from name""" # Take first 3 characters of each word, uppercase words = supplier_name.strip().split()[:3] # Max 3 words code_parts = [] for word in words: if len(word) >= 3: code_parts.append(word[:3].upper()) else: code_parts.append(word.upper()) base_code = "".join(code_parts)[:8] # Max 8 characters # Add random suffix to ensure uniqueness import random import string suffix = ''.join(random.choices(string.digits, k=2)) return f"{base_code}{suffix}" async def validate_supplier_data( self, tenant_id: UUID, supplier_data: Dict[str, Any], supplier_id: Optional[UUID] = None ) -> Dict[str, str]: """Validate supplier data and return errors""" errors = {} # Check required fields if not supplier_data.get('name'): errors['name'] = "Supplier name is required" # Check email format if provided email = supplier_data.get('email') if email: import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): errors['email'] = "Invalid email format" # Check phone format if provided phone = supplier_data.get('phone') if phone: # Basic phone validation (digits, spaces, dashes, parentheses) import re phone_pattern = r'^[\d\s\-\(\)\+]+$' if not re.match(phone_pattern, phone): errors['phone'] = "Invalid phone format" # Check lead time range lead_time = supplier_data.get('standard_lead_time') if lead_time is not None: if lead_time < 0 or lead_time > 365: errors['standard_lead_time'] = "Lead time must be between 0 and 365 days" # Check credit limit credit_limit = supplier_data.get('credit_limit') if credit_limit is not None and credit_limit < 0: errors['credit_limit'] = "Credit limit cannot be negative" # Check minimum order amount min_order = supplier_data.get('minimum_order_amount') if min_order is not None and min_order < 0: errors['minimum_order_amount'] = "Minimum order amount cannot be negative" return errors