340 lines
14 KiB
Python
340 lines
14 KiB
Python
|
|
# services/orders/app/services/smart_procurement_calculator.py
|
|||
|
|
"""
|
|||
|
|
Smart Procurement Calculator
|
|||
|
|
Implements multi-constraint procurement quantity optimization combining:
|
|||
|
|
- AI demand forecasting
|
|||
|
|
- Ingredient reorder rules (reorder_point, reorder_quantity)
|
|||
|
|
- Supplier constraints (minimum_order_quantity, minimum_order_amount)
|
|||
|
|
- Storage limits (max_stock_level)
|
|||
|
|
- Price tier optimization
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import math
|
|||
|
|
from decimal import Decimal
|
|||
|
|
from typing import Dict, Any, List, Tuple, Optional
|
|||
|
|
import structlog
|
|||
|
|
|
|||
|
|
logger = structlog.get_logger()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SmartProcurementCalculator:
|
|||
|
|
"""
|
|||
|
|
Smart procurement quantity calculator with multi-tier constraint optimization
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, procurement_settings: Dict[str, Any]):
|
|||
|
|
"""
|
|||
|
|
Initialize calculator with tenant procurement settings
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
procurement_settings: Tenant settings dict with flags:
|
|||
|
|
- use_reorder_rules: bool
|
|||
|
|
- economic_rounding: bool
|
|||
|
|
- respect_storage_limits: bool
|
|||
|
|
- use_supplier_minimums: bool
|
|||
|
|
- optimize_price_tiers: bool
|
|||
|
|
"""
|
|||
|
|
self.use_reorder_rules = procurement_settings.get('use_reorder_rules', True)
|
|||
|
|
self.economic_rounding = procurement_settings.get('economic_rounding', True)
|
|||
|
|
self.respect_storage_limits = procurement_settings.get('respect_storage_limits', True)
|
|||
|
|
self.use_supplier_minimums = procurement_settings.get('use_supplier_minimums', True)
|
|||
|
|
self.optimize_price_tiers = procurement_settings.get('optimize_price_tiers', True)
|
|||
|
|
|
|||
|
|
def calculate_procurement_quantity(
|
|||
|
|
self,
|
|||
|
|
ingredient: Dict[str, Any],
|
|||
|
|
supplier: Optional[Dict[str, Any]],
|
|||
|
|
price_list_entry: Optional[Dict[str, Any]],
|
|||
|
|
ai_forecast_quantity: Decimal,
|
|||
|
|
current_stock: Decimal,
|
|||
|
|
safety_stock_percentage: Decimal = Decimal('20.0')
|
|||
|
|
) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
Calculate optimal procurement quantity using smart hybrid approach
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
ingredient: Ingredient data with reorder_point, reorder_quantity, max_stock_level
|
|||
|
|
supplier: Supplier data with minimum_order_amount
|
|||
|
|
price_list_entry: Price list with minimum_order_quantity, tier_pricing
|
|||
|
|
ai_forecast_quantity: AI-predicted demand quantity
|
|||
|
|
current_stock: Current stock level
|
|||
|
|
safety_stock_percentage: Safety stock buffer percentage
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict with:
|
|||
|
|
- order_quantity: Final calculated quantity to order
|
|||
|
|
- calculation_method: Method used (e.g., 'REORDER_POINT_TRIGGERED')
|
|||
|
|
- ai_suggested_quantity: Original AI forecast
|
|||
|
|
- adjusted_quantity: Final quantity after constraints
|
|||
|
|
- adjustment_reason: Human-readable explanation
|
|||
|
|
- warnings: List of warnings/notes
|
|||
|
|
- supplier_minimum_applied: bool
|
|||
|
|
- storage_limit_applied: bool
|
|||
|
|
- reorder_rule_applied: bool
|
|||
|
|
- price_tier_applied: Dict or None
|
|||
|
|
"""
|
|||
|
|
warnings = []
|
|||
|
|
result = {
|
|||
|
|
'ai_suggested_quantity': ai_forecast_quantity,
|
|||
|
|
'supplier_minimum_applied': False,
|
|||
|
|
'storage_limit_applied': False,
|
|||
|
|
'reorder_rule_applied': False,
|
|||
|
|
'price_tier_applied': None
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Extract ingredient parameters
|
|||
|
|
reorder_point = Decimal(str(ingredient.get('reorder_point', 0)))
|
|||
|
|
reorder_quantity = Decimal(str(ingredient.get('reorder_quantity', 0)))
|
|||
|
|
low_stock_threshold = Decimal(str(ingredient.get('low_stock_threshold', 0)))
|
|||
|
|
max_stock_level = Decimal(str(ingredient.get('max_stock_level') or 'Infinity'))
|
|||
|
|
|
|||
|
|
# Extract supplier/price list parameters
|
|||
|
|
supplier_min_qty = Decimal('0')
|
|||
|
|
supplier_min_amount = Decimal('0')
|
|||
|
|
tier_pricing = []
|
|||
|
|
|
|||
|
|
if price_list_entry:
|
|||
|
|
supplier_min_qty = Decimal(str(price_list_entry.get('minimum_order_quantity', 0)))
|
|||
|
|
tier_pricing = price_list_entry.get('tier_pricing') or []
|
|||
|
|
|
|||
|
|
if supplier:
|
|||
|
|
supplier_min_amount = Decimal(str(supplier.get('minimum_order_amount', 0)))
|
|||
|
|
|
|||
|
|
# Calculate AI-based net requirement with safety stock
|
|||
|
|
safety_stock = ai_forecast_quantity * (safety_stock_percentage / Decimal('100'))
|
|||
|
|
total_needed = ai_forecast_quantity + safety_stock
|
|||
|
|
ai_net_requirement = max(Decimal('0'), total_needed - current_stock)
|
|||
|
|
|
|||
|
|
# TIER 1: Critical Safety Check (Emergency Override)
|
|||
|
|
if self.use_reorder_rules and current_stock <= low_stock_threshold:
|
|||
|
|
base_order = max(reorder_quantity, ai_net_requirement)
|
|||
|
|
result['calculation_method'] = 'CRITICAL_STOCK_EMERGENCY'
|
|||
|
|
result['reorder_rule_applied'] = True
|
|||
|
|
warnings.append(f"CRITICAL: Stock ({current_stock}) below threshold ({low_stock_threshold})")
|
|||
|
|
order_qty = base_order
|
|||
|
|
|
|||
|
|
# TIER 2: Reorder Point Triggered
|
|||
|
|
elif self.use_reorder_rules and current_stock <= reorder_point:
|
|||
|
|
base_order = max(reorder_quantity, ai_net_requirement)
|
|||
|
|
result['calculation_method'] = 'REORDER_POINT_TRIGGERED'
|
|||
|
|
result['reorder_rule_applied'] = True
|
|||
|
|
warnings.append(f"Reorder point triggered: stock ({current_stock}) ≤ reorder point ({reorder_point})")
|
|||
|
|
order_qty = base_order
|
|||
|
|
|
|||
|
|
# TIER 3: Forecast-Driven (Above reorder point, no immediate need)
|
|||
|
|
elif ai_net_requirement > 0:
|
|||
|
|
order_qty = ai_net_requirement
|
|||
|
|
result['calculation_method'] = 'FORECAST_DRIVEN_PROACTIVE'
|
|||
|
|
warnings.append(f"AI forecast suggests ordering {ai_net_requirement} units")
|
|||
|
|
|
|||
|
|
# TIER 4: No Order Needed
|
|||
|
|
else:
|
|||
|
|
result['order_quantity'] = Decimal('0')
|
|||
|
|
result['adjusted_quantity'] = Decimal('0')
|
|||
|
|
result['calculation_method'] = 'SUFFICIENT_STOCK'
|
|||
|
|
result['adjustment_reason'] = f"Current stock ({current_stock}) is sufficient. No order needed."
|
|||
|
|
result['warnings'] = warnings
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
# Apply Economic Rounding (reorder_quantity multiples)
|
|||
|
|
if self.economic_rounding and reorder_quantity > 0:
|
|||
|
|
multiples = math.ceil(float(order_qty / reorder_quantity))
|
|||
|
|
rounded_qty = Decimal(multiples) * reorder_quantity
|
|||
|
|
if rounded_qty > order_qty:
|
|||
|
|
warnings.append(f"Rounded to {multiples}× reorder quantity ({reorder_quantity}) = {rounded_qty}")
|
|||
|
|
order_qty = rounded_qty
|
|||
|
|
|
|||
|
|
# Apply Supplier Minimum Quantity Constraint
|
|||
|
|
if self.use_supplier_minimums and supplier_min_qty > 0:
|
|||
|
|
if order_qty < supplier_min_qty:
|
|||
|
|
warnings.append(f"Increased from {order_qty} to supplier minimum ({supplier_min_qty})")
|
|||
|
|
order_qty = supplier_min_qty
|
|||
|
|
result['supplier_minimum_applied'] = True
|
|||
|
|
else:
|
|||
|
|
# Round to multiples of minimum_order_quantity (packaging constraint)
|
|||
|
|
multiples = math.ceil(float(order_qty / supplier_min_qty))
|
|||
|
|
rounded_qty = Decimal(multiples) * supplier_min_qty
|
|||
|
|
if rounded_qty > order_qty:
|
|||
|
|
warnings.append(f"Rounded to {multiples}× supplier packaging ({supplier_min_qty}) = {rounded_qty}")
|
|||
|
|
result['supplier_minimum_applied'] = True
|
|||
|
|
order_qty = rounded_qty
|
|||
|
|
|
|||
|
|
# Apply Price Tier Optimization
|
|||
|
|
if self.optimize_price_tiers and tier_pricing and price_list_entry:
|
|||
|
|
unit_price = Decimal(str(price_list_entry.get('unit_price', 0)))
|
|||
|
|
tier_result = self._optimize_price_tier(
|
|||
|
|
order_qty,
|
|||
|
|
unit_price,
|
|||
|
|
tier_pricing,
|
|||
|
|
current_stock,
|
|||
|
|
max_stock_level
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if tier_result['tier_applied']:
|
|||
|
|
order_qty = tier_result['optimized_quantity']
|
|||
|
|
result['price_tier_applied'] = tier_result['tier_info']
|
|||
|
|
warnings.append(tier_result['message'])
|
|||
|
|
|
|||
|
|
# Apply Storage Capacity Constraint
|
|||
|
|
if self.respect_storage_limits and max_stock_level != Decimal('Infinity'):
|
|||
|
|
if (current_stock + order_qty) > max_stock_level:
|
|||
|
|
capped_qty = max(Decimal('0'), max_stock_level - current_stock)
|
|||
|
|
warnings.append(f"Capped from {order_qty} to {capped_qty} due to storage limit ({max_stock_level})")
|
|||
|
|
order_qty = capped_qty
|
|||
|
|
result['storage_limit_applied'] = True
|
|||
|
|
result['calculation_method'] += '_STORAGE_LIMITED'
|
|||
|
|
|
|||
|
|
# Check supplier minimum_order_amount (total order value constraint)
|
|||
|
|
if self.use_supplier_minimums and supplier_min_amount > 0 and price_list_entry:
|
|||
|
|
unit_price = Decimal(str(price_list_entry.get('unit_price', 0)))
|
|||
|
|
order_value = order_qty * unit_price
|
|||
|
|
|
|||
|
|
if order_value < supplier_min_amount:
|
|||
|
|
warnings.append(
|
|||
|
|
f"⚠️ Order value €{order_value:.2f} < supplier minimum €{supplier_min_amount:.2f}. "
|
|||
|
|
"This item needs to be combined with other products in the same PO."
|
|||
|
|
)
|
|||
|
|
result['calculation_method'] += '_NEEDS_CONSOLIDATION'
|
|||
|
|
|
|||
|
|
# Build final result
|
|||
|
|
result['order_quantity'] = order_qty
|
|||
|
|
result['adjusted_quantity'] = order_qty
|
|||
|
|
result['adjustment_reason'] = self._build_adjustment_reason(
|
|||
|
|
ai_forecast_quantity,
|
|||
|
|
ai_net_requirement,
|
|||
|
|
order_qty,
|
|||
|
|
warnings,
|
|||
|
|
result
|
|||
|
|
)
|
|||
|
|
result['warnings'] = warnings
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def _optimize_price_tier(
|
|||
|
|
self,
|
|||
|
|
current_qty: Decimal,
|
|||
|
|
base_unit_price: Decimal,
|
|||
|
|
tier_pricing: List[Dict[str, Any]],
|
|||
|
|
current_stock: Decimal,
|
|||
|
|
max_stock_level: Decimal
|
|||
|
|
) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
Optimize order quantity to capture volume discount tiers if beneficial
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
current_qty: Current calculated order quantity
|
|||
|
|
base_unit_price: Base unit price without tiers
|
|||
|
|
tier_pricing: List of tier dicts with 'quantity' and 'price'
|
|||
|
|
current_stock: Current stock level
|
|||
|
|
max_stock_level: Maximum storage capacity
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict with tier_applied (bool), optimized_quantity, tier_info, message
|
|||
|
|
"""
|
|||
|
|
if not tier_pricing:
|
|||
|
|
return {'tier_applied': False, 'optimized_quantity': current_qty}
|
|||
|
|
|
|||
|
|
# Sort tiers by quantity
|
|||
|
|
sorted_tiers = sorted(tier_pricing, key=lambda x: x['quantity'])
|
|||
|
|
|
|||
|
|
best_tier = None
|
|||
|
|
best_savings = Decimal('0')
|
|||
|
|
|
|||
|
|
for tier in sorted_tiers:
|
|||
|
|
tier_qty = Decimal(str(tier['quantity']))
|
|||
|
|
tier_price = Decimal(str(tier['price']))
|
|||
|
|
|
|||
|
|
# Skip if tier quantity is below current quantity (already captured)
|
|||
|
|
if tier_qty <= current_qty:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Skip if tier would exceed storage capacity
|
|||
|
|
if self.respect_storage_limits and (current_stock + tier_qty) > max_stock_level:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Skip if tier is more than 50% above current quantity (too much excess)
|
|||
|
|
if tier_qty > current_qty * Decimal('1.5'):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Calculate savings
|
|||
|
|
current_cost = current_qty * base_unit_price
|
|||
|
|
tier_cost = tier_qty * tier_price
|
|||
|
|
savings = current_cost - tier_cost
|
|||
|
|
|
|||
|
|
if savings > best_savings:
|
|||
|
|
best_savings = savings
|
|||
|
|
best_tier = {
|
|||
|
|
'quantity': tier_qty,
|
|||
|
|
'price': tier_price,
|
|||
|
|
'savings': savings
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if best_tier:
|
|||
|
|
return {
|
|||
|
|
'tier_applied': True,
|
|||
|
|
'optimized_quantity': best_tier['quantity'],
|
|||
|
|
'tier_info': best_tier,
|
|||
|
|
'message': (
|
|||
|
|
f"Upgraded to {best_tier['quantity']} units "
|
|||
|
|
f"@ €{best_tier['price']}/unit "
|
|||
|
|
f"(saves €{best_tier['savings']:.2f})"
|
|||
|
|
)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return {'tier_applied': False, 'optimized_quantity': current_qty}
|
|||
|
|
|
|||
|
|
def _build_adjustment_reason(
|
|||
|
|
self,
|
|||
|
|
ai_forecast: Decimal,
|
|||
|
|
ai_net_requirement: Decimal,
|
|||
|
|
final_quantity: Decimal,
|
|||
|
|
warnings: List[str],
|
|||
|
|
result: Dict[str, Any]
|
|||
|
|
) -> str:
|
|||
|
|
"""
|
|||
|
|
Build human-readable explanation of quantity adjustments
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
ai_forecast: Original AI forecast
|
|||
|
|
ai_net_requirement: AI forecast + safety stock - current stock
|
|||
|
|
final_quantity: Final order quantity after all adjustments
|
|||
|
|
warnings: List of warning messages
|
|||
|
|
result: Calculation result dict
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Human-readable adjustment explanation
|
|||
|
|
"""
|
|||
|
|
parts = []
|
|||
|
|
|
|||
|
|
# Start with calculation method
|
|||
|
|
method = result.get('calculation_method', 'UNKNOWN')
|
|||
|
|
parts.append(f"Method: {method.replace('_', ' ').title()}")
|
|||
|
|
|
|||
|
|
# AI forecast base
|
|||
|
|
parts.append(f"AI Forecast: {ai_forecast} units, Net Requirement: {ai_net_requirement} units")
|
|||
|
|
|
|||
|
|
# Adjustments applied
|
|||
|
|
adjustments = []
|
|||
|
|
if result.get('reorder_rule_applied'):
|
|||
|
|
adjustments.append("reorder rules")
|
|||
|
|
if result.get('supplier_minimum_applied'):
|
|||
|
|
adjustments.append("supplier minimums")
|
|||
|
|
if result.get('storage_limit_applied'):
|
|||
|
|
adjustments.append("storage limits")
|
|||
|
|
if result.get('price_tier_applied'):
|
|||
|
|
adjustments.append("price tier optimization")
|
|||
|
|
|
|||
|
|
if adjustments:
|
|||
|
|
parts.append(f"Adjustments: {', '.join(adjustments)}")
|
|||
|
|
|
|||
|
|
# Final quantity
|
|||
|
|
parts.append(f"Final Quantity: {final_quantity} units")
|
|||
|
|
|
|||
|
|
# Key warnings
|
|||
|
|
if warnings:
|
|||
|
|
key_warnings = [w for w in warnings if '⚠️' in w or 'CRITICAL' in w or 'saves €' in w]
|
|||
|
|
if key_warnings:
|
|||
|
|
parts.append(f"Notes: {'; '.join(key_warnings)}")
|
|||
|
|
|
|||
|
|
return " | ".join(parts)
|