# services/orders/app/services/smart_procurement_calculator.py """ Smart Procurement Calculator Implements multi-constraint procurement quantity optimization combining: - AI demand forecasting - Ingredient reorder rules (reorder_point, reorder_quantity) - Supplier constraints (minimum_order_quantity, minimum_order_amount) - Storage limits (max_stock_level) - Price tier optimization """ import math from decimal import Decimal from typing import Dict, Any, List, Tuple, Optional import structlog logger = structlog.get_logger() class SmartProcurementCalculator: """ Smart procurement quantity calculator with multi-tier constraint optimization """ def __init__(self, procurement_settings: Dict[str, Any]): """ Initialize calculator with tenant procurement settings Args: procurement_settings: Tenant settings dict with flags: - use_reorder_rules: bool - economic_rounding: bool - respect_storage_limits: bool - use_supplier_minimums: bool - optimize_price_tiers: bool """ self.use_reorder_rules = procurement_settings.get('use_reorder_rules', True) self.economic_rounding = procurement_settings.get('economic_rounding', True) self.respect_storage_limits = procurement_settings.get('respect_storage_limits', True) self.use_supplier_minimums = procurement_settings.get('use_supplier_minimums', True) self.optimize_price_tiers = procurement_settings.get('optimize_price_tiers', True) def calculate_procurement_quantity( self, ingredient: Dict[str, Any], supplier: Optional[Dict[str, Any]], price_list_entry: Optional[Dict[str, Any]], ai_forecast_quantity: Decimal, current_stock: Decimal, safety_stock_percentage: Decimal = Decimal('20.0') ) -> Dict[str, Any]: """ Calculate optimal procurement quantity using smart hybrid approach Args: ingredient: Ingredient data with reorder_point, reorder_quantity, max_stock_level supplier: Supplier data with minimum_order_amount price_list_entry: Price list with minimum_order_quantity, tier_pricing ai_forecast_quantity: AI-predicted demand quantity current_stock: Current stock level safety_stock_percentage: Safety stock buffer percentage Returns: Dict with: - order_quantity: Final calculated quantity to order - calculation_method: Method used (e.g., 'REORDER_POINT_TRIGGERED') - ai_suggested_quantity: Original AI forecast - adjusted_quantity: Final quantity after constraints - adjustment_reason: Human-readable explanation - warnings: List of warnings/notes - supplier_minimum_applied: bool - storage_limit_applied: bool - reorder_rule_applied: bool - price_tier_applied: Dict or None """ warnings = [] result = { 'ai_suggested_quantity': ai_forecast_quantity, 'supplier_minimum_applied': False, 'storage_limit_applied': False, 'reorder_rule_applied': False, 'price_tier_applied': None } # Extract ingredient parameters reorder_point = Decimal(str(ingredient.get('reorder_point', 0))) reorder_quantity = Decimal(str(ingredient.get('reorder_quantity', 0))) low_stock_threshold = Decimal(str(ingredient.get('low_stock_threshold', 0))) max_stock_level = Decimal(str(ingredient.get('max_stock_level') or 'Infinity')) # Extract supplier/price list parameters supplier_min_qty = Decimal('0') supplier_min_amount = Decimal('0') tier_pricing = [] if price_list_entry: supplier_min_qty = Decimal(str(price_list_entry.get('minimum_order_quantity', 0))) tier_pricing = price_list_entry.get('tier_pricing') or [] if supplier: supplier_min_amount = Decimal(str(supplier.get('minimum_order_amount', 0))) # Calculate AI-based net requirement with safety stock safety_stock = ai_forecast_quantity * (safety_stock_percentage / Decimal('100')) total_needed = ai_forecast_quantity + safety_stock ai_net_requirement = max(Decimal('0'), total_needed - current_stock) # TIER 1: Critical Safety Check (Emergency Override) if self.use_reorder_rules and current_stock <= low_stock_threshold: base_order = max(reorder_quantity, ai_net_requirement) result['calculation_method'] = 'CRITICAL_STOCK_EMERGENCY' result['reorder_rule_applied'] = True warnings.append(f"CRITICAL: Stock ({current_stock}) below threshold ({low_stock_threshold})") order_qty = base_order # TIER 2: Reorder Point Triggered elif self.use_reorder_rules and current_stock <= reorder_point: base_order = max(reorder_quantity, ai_net_requirement) result['calculation_method'] = 'REORDER_POINT_TRIGGERED' result['reorder_rule_applied'] = True warnings.append(f"Reorder point triggered: stock ({current_stock}) ≤ reorder point ({reorder_point})") order_qty = base_order # TIER 3: Forecast-Driven (Above reorder point, no immediate need) elif ai_net_requirement > 0: order_qty = ai_net_requirement result['calculation_method'] = 'FORECAST_DRIVEN_PROACTIVE' warnings.append(f"AI forecast suggests ordering {ai_net_requirement} units") # TIER 4: No Order Needed else: result['order_quantity'] = Decimal('0') result['adjusted_quantity'] = Decimal('0') result['calculation_method'] = 'SUFFICIENT_STOCK' result['adjustment_reason'] = f"Current stock ({current_stock}) is sufficient. No order needed." result['warnings'] = warnings return result # Apply Economic Rounding (reorder_quantity multiples) if self.economic_rounding and reorder_quantity > 0: multiples = math.ceil(float(order_qty / reorder_quantity)) rounded_qty = Decimal(multiples) * reorder_quantity if rounded_qty > order_qty: warnings.append(f"Rounded to {multiples}× reorder quantity ({reorder_quantity}) = {rounded_qty}") order_qty = rounded_qty # Apply Supplier Minimum Quantity Constraint if self.use_supplier_minimums and supplier_min_qty > 0: if order_qty < supplier_min_qty: warnings.append(f"Increased from {order_qty} to supplier minimum ({supplier_min_qty})") order_qty = supplier_min_qty result['supplier_minimum_applied'] = True else: # Round to multiples of minimum_order_quantity (packaging constraint) multiples = math.ceil(float(order_qty / supplier_min_qty)) rounded_qty = Decimal(multiples) * supplier_min_qty if rounded_qty > order_qty: warnings.append(f"Rounded to {multiples}× supplier packaging ({supplier_min_qty}) = {rounded_qty}") result['supplier_minimum_applied'] = True order_qty = rounded_qty # Apply Price Tier Optimization if self.optimize_price_tiers and tier_pricing and price_list_entry: unit_price = Decimal(str(price_list_entry.get('unit_price', 0))) tier_result = self._optimize_price_tier( order_qty, unit_price, tier_pricing, current_stock, max_stock_level ) if tier_result['tier_applied']: order_qty = tier_result['optimized_quantity'] result['price_tier_applied'] = tier_result['tier_info'] warnings.append(tier_result['message']) # Apply Storage Capacity Constraint if self.respect_storage_limits and max_stock_level != Decimal('Infinity'): if (current_stock + order_qty) > max_stock_level: capped_qty = max(Decimal('0'), max_stock_level - current_stock) warnings.append(f"Capped from {order_qty} to {capped_qty} due to storage limit ({max_stock_level})") order_qty = capped_qty result['storage_limit_applied'] = True result['calculation_method'] += '_STORAGE_LIMITED' # Check supplier minimum_order_amount (total order value constraint) if self.use_supplier_minimums and supplier_min_amount > 0 and price_list_entry: unit_price = Decimal(str(price_list_entry.get('unit_price', 0))) order_value = order_qty * unit_price if order_value < supplier_min_amount: warnings.append( f"⚠️ Order value €{order_value:.2f} < supplier minimum €{supplier_min_amount:.2f}. " "This item needs to be combined with other products in the same PO." ) result['calculation_method'] += '_NEEDS_CONSOLIDATION' # Build final result result['order_quantity'] = order_qty result['adjusted_quantity'] = order_qty result['adjustment_reason'] = self._build_adjustment_reason( ai_forecast_quantity, ai_net_requirement, order_qty, warnings, result ) result['warnings'] = warnings return result def _optimize_price_tier( self, current_qty: Decimal, base_unit_price: Decimal, tier_pricing: List[Dict[str, Any]], current_stock: Decimal, max_stock_level: Decimal ) -> Dict[str, Any]: """ Optimize order quantity to capture volume discount tiers if beneficial Args: current_qty: Current calculated order quantity base_unit_price: Base unit price without tiers tier_pricing: List of tier dicts with 'quantity' and 'price' current_stock: Current stock level max_stock_level: Maximum storage capacity Returns: Dict with tier_applied (bool), optimized_quantity, tier_info, message """ if not tier_pricing: return {'tier_applied': False, 'optimized_quantity': current_qty} # Sort tiers by quantity sorted_tiers = sorted(tier_pricing, key=lambda x: x['quantity']) best_tier = None best_savings = Decimal('0') for tier in sorted_tiers: tier_qty = Decimal(str(tier['quantity'])) tier_price = Decimal(str(tier['price'])) # Skip if tier quantity is below current quantity (already captured) if tier_qty <= current_qty: continue # Skip if tier would exceed storage capacity if self.respect_storage_limits and (current_stock + tier_qty) > max_stock_level: continue # Skip if tier is more than 50% above current quantity (too much excess) if tier_qty > current_qty * Decimal('1.5'): continue # Calculate savings current_cost = current_qty * base_unit_price tier_cost = tier_qty * tier_price savings = current_cost - tier_cost if savings > best_savings: best_savings = savings best_tier = { 'quantity': tier_qty, 'price': tier_price, 'savings': savings } if best_tier: return { 'tier_applied': True, 'optimized_quantity': best_tier['quantity'], 'tier_info': best_tier, 'message': ( f"Upgraded to {best_tier['quantity']} units " f"@ €{best_tier['price']}/unit " f"(saves €{best_tier['savings']:.2f})" ) } return {'tier_applied': False, 'optimized_quantity': current_qty} def _build_adjustment_reason( self, ai_forecast: Decimal, ai_net_requirement: Decimal, final_quantity: Decimal, warnings: List[str], result: Dict[str, Any] ) -> str: """ Build human-readable explanation of quantity adjustments Args: ai_forecast: Original AI forecast ai_net_requirement: AI forecast + safety stock - current stock final_quantity: Final order quantity after all adjustments warnings: List of warning messages result: Calculation result dict Returns: Human-readable adjustment explanation """ parts = [] # Start with calculation method method = result.get('calculation_method', 'UNKNOWN') parts.append(f"Method: {method.replace('_', ' ').title()}") # AI forecast base parts.append(f"AI Forecast: {ai_forecast} units, Net Requirement: {ai_net_requirement} units") # Adjustments applied adjustments = [] if result.get('reorder_rule_applied'): adjustments.append("reorder rules") if result.get('supplier_minimum_applied'): adjustments.append("supplier minimums") if result.get('storage_limit_applied'): adjustments.append("storage limits") if result.get('price_tier_applied'): adjustments.append("price tier optimization") if adjustments: parts.append(f"Adjustments: {', '.join(adjustments)}") # Final quantity parts.append(f"Final Quantity: {final_quantity} units") # Key warnings if warnings: key_warnings = [w for w in warnings if '⚠️' in w or 'CRITICAL' in w or 'saves €' in w] if key_warnings: parts.append(f"Notes: {'; '.join(key_warnings)}") return " | ".join(parts)