340 lines
14 KiB
Python
340 lines
14 KiB
Python
# services/orders/app/services/smart_procurement_calculator.py
|
||
"""
|
||
Smart Procurement Calculator
|
||
Implements multi-constraint procurement quantity optimization combining:
|
||
- AI demand forecasting
|
||
- Ingredient reorder rules (reorder_point, reorder_quantity)
|
||
- Supplier constraints (minimum_order_quantity, minimum_order_amount)
|
||
- Storage limits (max_stock_level)
|
||
- Price tier optimization
|
||
"""
|
||
|
||
import math
|
||
from decimal import Decimal
|
||
from typing import Dict, Any, List, Tuple, Optional
|
||
import structlog
|
||
|
||
logger = structlog.get_logger()
|
||
|
||
|
||
class SmartProcurementCalculator:
|
||
"""
|
||
Smart procurement quantity calculator with multi-tier constraint optimization
|
||
"""
|
||
|
||
def __init__(self, procurement_settings: Dict[str, Any]):
|
||
"""
|
||
Initialize calculator with tenant procurement settings
|
||
|
||
Args:
|
||
procurement_settings: Tenant settings dict with flags:
|
||
- use_reorder_rules: bool
|
||
- economic_rounding: bool
|
||
- respect_storage_limits: bool
|
||
- use_supplier_minimums: bool
|
||
- optimize_price_tiers: bool
|
||
"""
|
||
self.use_reorder_rules = procurement_settings.get('use_reorder_rules', True)
|
||
self.economic_rounding = procurement_settings.get('economic_rounding', True)
|
||
self.respect_storage_limits = procurement_settings.get('respect_storage_limits', True)
|
||
self.use_supplier_minimums = procurement_settings.get('use_supplier_minimums', True)
|
||
self.optimize_price_tiers = procurement_settings.get('optimize_price_tiers', True)
|
||
|
||
def calculate_procurement_quantity(
|
||
self,
|
||
ingredient: Dict[str, Any],
|
||
supplier: Optional[Dict[str, Any]],
|
||
price_list_entry: Optional[Dict[str, Any]],
|
||
ai_forecast_quantity: Decimal,
|
||
current_stock: Decimal,
|
||
safety_stock_percentage: Decimal = Decimal('20.0')
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Calculate optimal procurement quantity using smart hybrid approach
|
||
|
||
Args:
|
||
ingredient: Ingredient data with reorder_point, reorder_quantity, max_stock_level
|
||
supplier: Supplier data with minimum_order_amount
|
||
price_list_entry: Price list with minimum_order_quantity, tier_pricing
|
||
ai_forecast_quantity: AI-predicted demand quantity
|
||
current_stock: Current stock level
|
||
safety_stock_percentage: Safety stock buffer percentage
|
||
|
||
Returns:
|
||
Dict with:
|
||
- order_quantity: Final calculated quantity to order
|
||
- calculation_method: Method used (e.g., 'REORDER_POINT_TRIGGERED')
|
||
- ai_suggested_quantity: Original AI forecast
|
||
- adjusted_quantity: Final quantity after constraints
|
||
- adjustment_reason: Human-readable explanation
|
||
- warnings: List of warnings/notes
|
||
- supplier_minimum_applied: bool
|
||
- storage_limit_applied: bool
|
||
- reorder_rule_applied: bool
|
||
- price_tier_applied: Dict or None
|
||
"""
|
||
warnings = []
|
||
result = {
|
||
'ai_suggested_quantity': ai_forecast_quantity,
|
||
'supplier_minimum_applied': False,
|
||
'storage_limit_applied': False,
|
||
'reorder_rule_applied': False,
|
||
'price_tier_applied': None
|
||
}
|
||
|
||
# Extract ingredient parameters
|
||
reorder_point = Decimal(str(ingredient.get('reorder_point', 0)))
|
||
reorder_quantity = Decimal(str(ingredient.get('reorder_quantity', 0)))
|
||
low_stock_threshold = Decimal(str(ingredient.get('low_stock_threshold', 0)))
|
||
max_stock_level = Decimal(str(ingredient.get('max_stock_level') or 'Infinity'))
|
||
|
||
# Extract supplier/price list parameters
|
||
supplier_min_qty = Decimal('0')
|
||
supplier_min_amount = Decimal('0')
|
||
tier_pricing = []
|
||
|
||
if price_list_entry:
|
||
supplier_min_qty = Decimal(str(price_list_entry.get('minimum_order_quantity', 0)))
|
||
tier_pricing = price_list_entry.get('tier_pricing') or []
|
||
|
||
if supplier:
|
||
supplier_min_amount = Decimal(str(supplier.get('minimum_order_amount', 0)))
|
||
|
||
# Calculate AI-based net requirement with safety stock
|
||
safety_stock = ai_forecast_quantity * (safety_stock_percentage / Decimal('100'))
|
||
total_needed = ai_forecast_quantity + safety_stock
|
||
ai_net_requirement = max(Decimal('0'), total_needed - current_stock)
|
||
|
||
# TIER 1: Critical Safety Check (Emergency Override)
|
||
if self.use_reorder_rules and current_stock <= low_stock_threshold:
|
||
base_order = max(reorder_quantity, ai_net_requirement)
|
||
result['calculation_method'] = 'CRITICAL_STOCK_EMERGENCY'
|
||
result['reorder_rule_applied'] = True
|
||
warnings.append(f"CRITICAL: Stock ({current_stock}) below threshold ({low_stock_threshold})")
|
||
order_qty = base_order
|
||
|
||
# TIER 2: Reorder Point Triggered
|
||
elif self.use_reorder_rules and current_stock <= reorder_point:
|
||
base_order = max(reorder_quantity, ai_net_requirement)
|
||
result['calculation_method'] = 'REORDER_POINT_TRIGGERED'
|
||
result['reorder_rule_applied'] = True
|
||
warnings.append(f"Reorder point triggered: stock ({current_stock}) ≤ reorder point ({reorder_point})")
|
||
order_qty = base_order
|
||
|
||
# TIER 3: Forecast-Driven (Above reorder point, no immediate need)
|
||
elif ai_net_requirement > 0:
|
||
order_qty = ai_net_requirement
|
||
result['calculation_method'] = 'FORECAST_DRIVEN_PROACTIVE'
|
||
warnings.append(f"AI forecast suggests ordering {ai_net_requirement} units")
|
||
|
||
# TIER 4: No Order Needed
|
||
else:
|
||
result['order_quantity'] = Decimal('0')
|
||
result['adjusted_quantity'] = Decimal('0')
|
||
result['calculation_method'] = 'SUFFICIENT_STOCK'
|
||
result['adjustment_reason'] = f"Current stock ({current_stock}) is sufficient. No order needed."
|
||
result['warnings'] = warnings
|
||
return result
|
||
|
||
# Apply Economic Rounding (reorder_quantity multiples)
|
||
if self.economic_rounding and reorder_quantity > 0:
|
||
multiples = math.ceil(float(order_qty / reorder_quantity))
|
||
rounded_qty = Decimal(multiples) * reorder_quantity
|
||
if rounded_qty > order_qty:
|
||
warnings.append(f"Rounded to {multiples}× reorder quantity ({reorder_quantity}) = {rounded_qty}")
|
||
order_qty = rounded_qty
|
||
|
||
# Apply Supplier Minimum Quantity Constraint
|
||
if self.use_supplier_minimums and supplier_min_qty > 0:
|
||
if order_qty < supplier_min_qty:
|
||
warnings.append(f"Increased from {order_qty} to supplier minimum ({supplier_min_qty})")
|
||
order_qty = supplier_min_qty
|
||
result['supplier_minimum_applied'] = True
|
||
else:
|
||
# Round to multiples of minimum_order_quantity (packaging constraint)
|
||
multiples = math.ceil(float(order_qty / supplier_min_qty))
|
||
rounded_qty = Decimal(multiples) * supplier_min_qty
|
||
if rounded_qty > order_qty:
|
||
warnings.append(f"Rounded to {multiples}× supplier packaging ({supplier_min_qty}) = {rounded_qty}")
|
||
result['supplier_minimum_applied'] = True
|
||
order_qty = rounded_qty
|
||
|
||
# Apply Price Tier Optimization
|
||
if self.optimize_price_tiers and tier_pricing and price_list_entry:
|
||
unit_price = Decimal(str(price_list_entry.get('unit_price', 0)))
|
||
tier_result = self._optimize_price_tier(
|
||
order_qty,
|
||
unit_price,
|
||
tier_pricing,
|
||
current_stock,
|
||
max_stock_level
|
||
)
|
||
|
||
if tier_result['tier_applied']:
|
||
order_qty = tier_result['optimized_quantity']
|
||
result['price_tier_applied'] = tier_result['tier_info']
|
||
warnings.append(tier_result['message'])
|
||
|
||
# Apply Storage Capacity Constraint
|
||
if self.respect_storage_limits and max_stock_level != Decimal('Infinity'):
|
||
if (current_stock + order_qty) > max_stock_level:
|
||
capped_qty = max(Decimal('0'), max_stock_level - current_stock)
|
||
warnings.append(f"Capped from {order_qty} to {capped_qty} due to storage limit ({max_stock_level})")
|
||
order_qty = capped_qty
|
||
result['storage_limit_applied'] = True
|
||
result['calculation_method'] += '_STORAGE_LIMITED'
|
||
|
||
# Check supplier minimum_order_amount (total order value constraint)
|
||
if self.use_supplier_minimums and supplier_min_amount > 0 and price_list_entry:
|
||
unit_price = Decimal(str(price_list_entry.get('unit_price', 0)))
|
||
order_value = order_qty * unit_price
|
||
|
||
if order_value < supplier_min_amount:
|
||
warnings.append(
|
||
f"⚠️ Order value €{order_value:.2f} < supplier minimum €{supplier_min_amount:.2f}. "
|
||
"This item needs to be combined with other products in the same PO."
|
||
)
|
||
result['calculation_method'] += '_NEEDS_CONSOLIDATION'
|
||
|
||
# Build final result
|
||
result['order_quantity'] = order_qty
|
||
result['adjusted_quantity'] = order_qty
|
||
result['adjustment_reason'] = self._build_adjustment_reason(
|
||
ai_forecast_quantity,
|
||
ai_net_requirement,
|
||
order_qty,
|
||
warnings,
|
||
result
|
||
)
|
||
result['warnings'] = warnings
|
||
|
||
return result
|
||
|
||
def _optimize_price_tier(
|
||
self,
|
||
current_qty: Decimal,
|
||
base_unit_price: Decimal,
|
||
tier_pricing: List[Dict[str, Any]],
|
||
current_stock: Decimal,
|
||
max_stock_level: Decimal
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Optimize order quantity to capture volume discount tiers if beneficial
|
||
|
||
Args:
|
||
current_qty: Current calculated order quantity
|
||
base_unit_price: Base unit price without tiers
|
||
tier_pricing: List of tier dicts with 'quantity' and 'price'
|
||
current_stock: Current stock level
|
||
max_stock_level: Maximum storage capacity
|
||
|
||
Returns:
|
||
Dict with tier_applied (bool), optimized_quantity, tier_info, message
|
||
"""
|
||
if not tier_pricing:
|
||
return {'tier_applied': False, 'optimized_quantity': current_qty}
|
||
|
||
# Sort tiers by quantity
|
||
sorted_tiers = sorted(tier_pricing, key=lambda x: x['quantity'])
|
||
|
||
best_tier = None
|
||
best_savings = Decimal('0')
|
||
|
||
for tier in sorted_tiers:
|
||
tier_qty = Decimal(str(tier['quantity']))
|
||
tier_price = Decimal(str(tier['price']))
|
||
|
||
# Skip if tier quantity is below current quantity (already captured)
|
||
if tier_qty <= current_qty:
|
||
continue
|
||
|
||
# Skip if tier would exceed storage capacity
|
||
if self.respect_storage_limits and (current_stock + tier_qty) > max_stock_level:
|
||
continue
|
||
|
||
# Skip if tier is more than 50% above current quantity (too much excess)
|
||
if tier_qty > current_qty * Decimal('1.5'):
|
||
continue
|
||
|
||
# Calculate savings
|
||
current_cost = current_qty * base_unit_price
|
||
tier_cost = tier_qty * tier_price
|
||
savings = current_cost - tier_cost
|
||
|
||
if savings > best_savings:
|
||
best_savings = savings
|
||
best_tier = {
|
||
'quantity': tier_qty,
|
||
'price': tier_price,
|
||
'savings': savings
|
||
}
|
||
|
||
if best_tier:
|
||
return {
|
||
'tier_applied': True,
|
||
'optimized_quantity': best_tier['quantity'],
|
||
'tier_info': best_tier,
|
||
'message': (
|
||
f"Upgraded to {best_tier['quantity']} units "
|
||
f"@ €{best_tier['price']}/unit "
|
||
f"(saves €{best_tier['savings']:.2f})"
|
||
)
|
||
}
|
||
|
||
return {'tier_applied': False, 'optimized_quantity': current_qty}
|
||
|
||
def _build_adjustment_reason(
|
||
self,
|
||
ai_forecast: Decimal,
|
||
ai_net_requirement: Decimal,
|
||
final_quantity: Decimal,
|
||
warnings: List[str],
|
||
result: Dict[str, Any]
|
||
) -> str:
|
||
"""
|
||
Build human-readable explanation of quantity adjustments
|
||
|
||
Args:
|
||
ai_forecast: Original AI forecast
|
||
ai_net_requirement: AI forecast + safety stock - current stock
|
||
final_quantity: Final order quantity after all adjustments
|
||
warnings: List of warning messages
|
||
result: Calculation result dict
|
||
|
||
Returns:
|
||
Human-readable adjustment explanation
|
||
"""
|
||
parts = []
|
||
|
||
# Start with calculation method
|
||
method = result.get('calculation_method', 'UNKNOWN')
|
||
parts.append(f"Method: {method.replace('_', ' ').title()}")
|
||
|
||
# AI forecast base
|
||
parts.append(f"AI Forecast: {ai_forecast} units, Net Requirement: {ai_net_requirement} units")
|
||
|
||
# Adjustments applied
|
||
adjustments = []
|
||
if result.get('reorder_rule_applied'):
|
||
adjustments.append("reorder rules")
|
||
if result.get('supplier_minimum_applied'):
|
||
adjustments.append("supplier minimums")
|
||
if result.get('storage_limit_applied'):
|
||
adjustments.append("storage limits")
|
||
if result.get('price_tier_applied'):
|
||
adjustments.append("price tier optimization")
|
||
|
||
if adjustments:
|
||
parts.append(f"Adjustments: {', '.join(adjustments)}")
|
||
|
||
# Final quantity
|
||
parts.append(f"Final Quantity: {final_quantity} units")
|
||
|
||
# Key warnings
|
||
if warnings:
|
||
key_warnings = [w for w in warnings if '⚠️' in w or 'CRITICAL' in w or 'saves €' in w]
|
||
if key_warnings:
|
||
parts.append(f"Notes: {'; '.join(key_warnings)}")
|
||
|
||
return " | ".join(parts)
|