# services/orders/app/services/approval_rules_service.py """ Approval Rules Service - Smart auto-approval logic for purchase orders Evaluates POs against configurable business rules to determine if auto-approval is appropriate """ from typing import Dict, List, Any, Optional, Tuple from decimal import Decimal from uuid import UUID import structlog from shared.config.base import BaseServiceSettings from shared.utils.tenant_settings_client import TenantSettingsClient logger = structlog.get_logger() class ApprovalRulesService: """ Service for evaluating purchase orders against approval rules Implements smart auto-approval logic based on multiple criteria Uses tenant-specific settings from the database instead of system-level config """ def __init__(self, config: BaseServiceSettings, tenant_id: UUID): self.config = config self.tenant_id = tenant_id # Initialize tenant settings client tenant_service_url = getattr(config, 'TENANT_SERVICE_URL', 'http://tenant-service:8000') self.tenant_settings_client = TenantSettingsClient(tenant_service_url=tenant_service_url) async def evaluate_po_for_auto_approval( self, po_data: Dict[str, Any], supplier_data: Optional[Dict[str, Any]] = None, requirements_data: Optional[List[Dict[str, Any]]] = None ) -> Tuple[bool, List[str]]: """ Evaluate if a PO should be auto-approved using tenant-specific settings Returns: Tuple of (should_auto_approve, reasons) """ # Fetch tenant-specific procurement settings try: tenant_settings = await self.tenant_settings_client.get_procurement_settings(self.tenant_id) except Exception as e: logger.error("Failed to fetch tenant settings, using safe defaults", tenant_id=str(self.tenant_id), error=str(e)) # Use safe defaults if settings unavailable tenant_settings = { 'auto_approve_enabled': False, 'auto_approve_threshold_eur': 500.0, 'auto_approve_min_supplier_score': 0.80, 'require_approval_new_suppliers': True, 'require_approval_critical_items': True } # Check if auto-approval is enabled for this tenant if not tenant_settings.get('auto_approve_enabled', True): return False, ["Auto-approval is disabled in tenant settings"] reasons = [] should_approve = True # Rule 1: Amount threshold check total_amount = self._calculate_po_total(po_data) threshold = Decimal(str(tenant_settings.get('auto_approve_threshold_eur', 500.0))) if total_amount > threshold: should_approve = False reasons.append( f"PO amount €{total_amount:.2f} exceeds threshold €{threshold:.2f}" ) else: reasons.append( f"PO amount €{total_amount:.2f} within threshold €{threshold:.2f}" ) # Rule 2: Supplier trust score check min_supplier_score = tenant_settings.get('auto_approve_min_supplier_score', 0.80) if supplier_data: supplier_score = supplier_data.get('trust_score', 0.0) is_preferred = supplier_data.get('is_preferred_supplier', False) auto_approve_enabled = supplier_data.get('auto_approve_enabled', False) if supplier_score < min_supplier_score: should_approve = False reasons.append( f"Supplier trust score {supplier_score:.2f} below minimum {min_supplier_score:.2f}" ) else: reasons.append(f"Supplier trust score {supplier_score:.2f} meets minimum requirements") if not is_preferred: should_approve = False reasons.append("Supplier is not marked as preferred") else: reasons.append("Supplier is a preferred supplier") if not auto_approve_enabled: should_approve = False reasons.append("Auto-approve is disabled for this supplier") else: reasons.append("Auto-approve is enabled for this supplier") elif supplier_data is None: should_approve = False reasons.append("No supplier data available") # Rule 3: New supplier check require_approval_new_suppliers = tenant_settings.get('require_approval_new_suppliers', True) if supplier_data and require_approval_new_suppliers: total_pos = supplier_data.get('total_pos_count', 0) if total_pos < 5: should_approve = False reasons.append(f"New supplier with only {total_pos} previous orders (minimum 5 required)") else: reasons.append(f"Established supplier with {total_pos} previous orders") # Rule 4: Critical/urgent items check require_approval_critical_items = tenant_settings.get('require_approval_critical_items', True) if requirements_data and require_approval_critical_items: critical_count = sum( 1 for req in requirements_data if req.get('priority') in ['critical', 'urgent', 'CRITICAL', 'URGENT'] ) if critical_count > 0: should_approve = False reasons.append(f"Contains {critical_count} critical/urgent items requiring manual review") else: reasons.append("No critical/urgent items detected") # Rule 5: Historical approval rate check if supplier_data: total_pos = supplier_data.get('total_pos_count', 0) approved_pos = supplier_data.get('approved_pos_count', 0) if total_pos > 0: approval_rate = approved_pos / total_pos if approval_rate < 0.95: should_approve = False reasons.append( f"Historical approval rate {approval_rate:.1%} below 95% threshold" ) else: reasons.append(f"High historical approval rate {approval_rate:.1%}") # Rule 6: PO priority check priority = po_data.get('priority', 'normal') if priority in ['urgent', 'critical', 'URGENT', 'CRITICAL']: should_approve = False reasons.append(f"PO priority is '{priority}' - requires manual review") logger.info( "PO auto-approval evaluation completed", should_auto_approve=should_approve, total_amount=float(total_amount), supplier_id=supplier_data.get('id') if supplier_data else None, reasons_count=len(reasons), po_data=po_data.get('id') if isinstance(po_data, dict) else str(po_data) ) return should_approve, reasons def _calculate_po_total(self, po_data: Dict[str, Any]) -> Decimal: """Calculate total PO amount including tax and shipping""" subtotal = Decimal(str(po_data.get('subtotal', 0))) tax = Decimal(str(po_data.get('tax_amount', 0))) shipping = Decimal(str(po_data.get('shipping_cost', 0))) discount = Decimal(str(po_data.get('discount_amount', 0))) total = subtotal + tax + shipping - discount return total def get_approval_summary( self, should_approve: bool, reasons: List[str] ) -> Dict[str, Any]: """ Generate a human-readable approval summary Returns: Dict with summary data for UI display """ return { "auto_approved": should_approve, "decision": "APPROVED" if should_approve else "REQUIRES_MANUAL_APPROVAL", "reasons": reasons, "reason_count": len(reasons), "summary": self._format_summary(should_approve, reasons) } def _format_summary(self, should_approve: bool, reasons: List[str]) -> str: """Format approval decision summary""" if should_approve: return f"Auto-approved: {', '.join(reasons[:2])}" else: failing_reasons = [r for r in reasons if any( keyword in r.lower() for keyword in ['exceeds', 'below', 'not', 'disabled', 'new', 'critical'] )] if failing_reasons: return f"Manual approval required: {failing_reasons[0]}" return "Manual approval required" def validate_approval_override( self, override_reason: str, user_role: str ) -> Tuple[bool, Optional[str]]: """ Validate if a user can override auto-approval decision Returns: Tuple of (is_valid, error_message) """ # Only admin/owner can override if user_role not in ['admin', 'owner']: return False, "Insufficient permissions to override approval rules" # Require a reason if not override_reason or len(override_reason.strip()) < 10: return False, "Override reason must be at least 10 characters" return True, None