2025-10-30 21:08:07 +01:00
|
|
|
|
"""
|
|
|
|
|
|
Optimization Utilities
|
|
|
|
|
|
|
|
|
|
|
|
Provides optimization algorithms for procurement planning including
|
|
|
|
|
|
MOQ rounding, economic order quantity, and multi-objective optimization.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import math
|
|
|
|
|
|
from decimal import Decimal
|
|
|
|
|
|
from typing import List, Tuple, Dict, Optional
|
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
|
class OrderOptimizationResult:
|
|
|
|
|
|
"""Result of order quantity optimization"""
|
|
|
|
|
|
optimal_quantity: Decimal
|
|
|
|
|
|
order_cost: Decimal
|
|
|
|
|
|
holding_cost: Decimal
|
|
|
|
|
|
total_cost: Decimal
|
|
|
|
|
|
orders_per_year: float
|
|
|
|
|
|
reasoning: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_economic_order_quantity(
|
|
|
|
|
|
annual_demand: float,
|
|
|
|
|
|
ordering_cost: float,
|
|
|
|
|
|
holding_cost_per_unit: float
|
|
|
|
|
|
) -> float:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Calculate Economic Order Quantity (EOQ).
|
|
|
|
|
|
|
|
|
|
|
|
EOQ = sqrt((2 × D × S) / H)
|
|
|
|
|
|
where:
|
|
|
|
|
|
- D = Annual demand
|
|
|
|
|
|
- S = Ordering cost per order
|
|
|
|
|
|
- H = Holding cost per unit per year
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
annual_demand: Annual demand in units
|
|
|
|
|
|
ordering_cost: Cost per order placement
|
|
|
|
|
|
holding_cost_per_unit: Annual holding cost per unit
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Optimal order quantity
|
|
|
|
|
|
"""
|
|
|
|
|
|
if annual_demand <= 0 or ordering_cost <= 0 or holding_cost_per_unit <= 0:
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
eoq = math.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
|
|
|
|
|
|
return eoq
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def optimize_order_quantity(
|
|
|
|
|
|
required_quantity: Decimal,
|
|
|
|
|
|
annual_demand: float,
|
|
|
|
|
|
ordering_cost: float = 50.0,
|
|
|
|
|
|
holding_cost_rate: float = 0.25,
|
|
|
|
|
|
unit_price: float = 1.0,
|
|
|
|
|
|
min_order_qty: Optional[Decimal] = None,
|
|
|
|
|
|
max_order_qty: Optional[Decimal] = None
|
|
|
|
|
|
) -> OrderOptimizationResult:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Optimize order quantity considering EOQ and constraints.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
required_quantity: Quantity needed for current period
|
|
|
|
|
|
annual_demand: Estimated annual demand
|
|
|
|
|
|
ordering_cost: Fixed cost per order
|
|
|
|
|
|
holding_cost_rate: Annual holding cost as % of unit price
|
|
|
|
|
|
unit_price: Cost per unit
|
|
|
|
|
|
min_order_qty: Minimum order quantity (MOQ)
|
|
|
|
|
|
max_order_qty: Maximum order quantity (storage limit)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
OrderOptimizationResult with optimal quantity and costs
|
|
|
|
|
|
"""
|
|
|
|
|
|
holding_cost_per_unit = unit_price * holding_cost_rate
|
|
|
|
|
|
|
|
|
|
|
|
# Calculate EOQ
|
|
|
|
|
|
eoq = calculate_economic_order_quantity(
|
|
|
|
|
|
annual_demand,
|
|
|
|
|
|
ordering_cost,
|
|
|
|
|
|
holding_cost_per_unit
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Start with EOQ or required quantity, whichever is larger
|
|
|
|
|
|
optimal_qty = max(float(required_quantity), eoq)
|
|
|
|
|
|
|
2025-11-07 19:00:00 +00:00
|
|
|
|
# Build structured reasoning code with parameters
|
|
|
|
|
|
reasoning_parts = [f"EOQ:BASE|eoq={eoq:.2f}|required={required_quantity}"]
|
2025-10-30 21:08:07 +01:00
|
|
|
|
|
|
|
|
|
|
# Apply minimum order quantity
|
|
|
|
|
|
if min_order_qty and Decimal(optimal_qty) < min_order_qty:
|
|
|
|
|
|
optimal_qty = float(min_order_qty)
|
2025-11-07 19:00:00 +00:00
|
|
|
|
reasoning_parts.append(f"EOQ:MOQ_APPLIED|moq={min_order_qty}")
|
2025-10-30 21:08:07 +01:00
|
|
|
|
|
|
|
|
|
|
# Apply maximum order quantity
|
|
|
|
|
|
if max_order_qty and Decimal(optimal_qty) > max_order_qty:
|
|
|
|
|
|
optimal_qty = float(max_order_qty)
|
2025-11-07 19:00:00 +00:00
|
|
|
|
reasoning_parts.append(f"EOQ:MAX_APPLIED|max={max_order_qty}")
|
|
|
|
|
|
|
|
|
|
|
|
reasoning = "|".join(reasoning_parts)
|
2025-10-30 21:08:07 +01:00
|
|
|
|
|
|
|
|
|
|
# Calculate costs
|
|
|
|
|
|
orders_per_year = annual_demand / optimal_qty if optimal_qty > 0 else 0
|
|
|
|
|
|
annual_ordering_cost = orders_per_year * ordering_cost
|
|
|
|
|
|
annual_holding_cost = (optimal_qty / 2) * holding_cost_per_unit
|
|
|
|
|
|
total_annual_cost = annual_ordering_cost + annual_holding_cost
|
|
|
|
|
|
|
|
|
|
|
|
return OrderOptimizationResult(
|
|
|
|
|
|
optimal_quantity=Decimal(str(optimal_qty)),
|
|
|
|
|
|
order_cost=Decimal(str(annual_ordering_cost)),
|
|
|
|
|
|
holding_cost=Decimal(str(annual_holding_cost)),
|
|
|
|
|
|
total_cost=Decimal(str(total_annual_cost)),
|
|
|
|
|
|
orders_per_year=orders_per_year,
|
|
|
|
|
|
reasoning=reasoning
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def round_to_moq(
|
|
|
|
|
|
quantity: Decimal,
|
|
|
|
|
|
moq: Decimal,
|
|
|
|
|
|
round_up: bool = True
|
|
|
|
|
|
) -> Decimal:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Round quantity to meet minimum order quantity.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
quantity: Desired quantity
|
|
|
|
|
|
moq: Minimum order quantity
|
|
|
|
|
|
round_up: If True, always round up to next MOQ multiple
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Rounded quantity
|
|
|
|
|
|
"""
|
|
|
|
|
|
if quantity <= 0 or moq <= 0:
|
|
|
|
|
|
return quantity
|
|
|
|
|
|
|
|
|
|
|
|
if quantity < moq:
|
|
|
|
|
|
return moq
|
|
|
|
|
|
|
|
|
|
|
|
# Calculate how many MOQs needed
|
|
|
|
|
|
multiples = quantity / moq
|
|
|
|
|
|
|
|
|
|
|
|
if round_up:
|
|
|
|
|
|
return Decimal(math.ceil(float(multiples))) * moq
|
|
|
|
|
|
else:
|
|
|
|
|
|
return Decimal(round(float(multiples))) * moq
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def round_to_package_size(
|
|
|
|
|
|
quantity: Decimal,
|
|
|
|
|
|
package_size: Decimal,
|
|
|
|
|
|
allow_partial: bool = False
|
|
|
|
|
|
) -> Decimal:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Round quantity to package size.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
quantity: Desired quantity
|
|
|
|
|
|
package_size: Size of one package
|
|
|
|
|
|
allow_partial: If False, always round up to full packages
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Rounded quantity
|
|
|
|
|
|
"""
|
|
|
|
|
|
if quantity <= 0 or package_size <= 0:
|
|
|
|
|
|
return quantity
|
|
|
|
|
|
|
|
|
|
|
|
packages_needed = quantity / package_size
|
|
|
|
|
|
|
|
|
|
|
|
if allow_partial:
|
|
|
|
|
|
return quantity
|
|
|
|
|
|
else:
|
|
|
|
|
|
return Decimal(math.ceil(float(packages_needed))) * package_size
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def apply_price_tier_optimization(
|
|
|
|
|
|
base_quantity: Decimal,
|
|
|
|
|
|
unit_price: Decimal,
|
|
|
|
|
|
price_tiers: List[Dict]
|
|
|
|
|
|
) -> Tuple[Decimal, Decimal, str]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Optimize quantity to take advantage of price tiers.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
base_quantity: Base quantity needed
|
|
|
|
|
|
unit_price: Current unit price
|
|
|
|
|
|
price_tiers: List of dicts with 'min_quantity' and 'unit_price'
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Tuple of (optimized_quantity, unit_price, reasoning)
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not price_tiers:
|
|
|
|
|
|
return base_quantity, unit_price, "No price tiers available"
|
|
|
|
|
|
|
|
|
|
|
|
# Sort tiers by min_quantity
|
|
|
|
|
|
sorted_tiers = sorted(price_tiers, key=lambda x: x['min_quantity'])
|
|
|
|
|
|
|
|
|
|
|
|
# Calculate cost at base quantity
|
|
|
|
|
|
base_cost = base_quantity * unit_price
|
|
|
|
|
|
|
|
|
|
|
|
# Find current tier
|
|
|
|
|
|
current_tier_price = unit_price
|
|
|
|
|
|
for tier in sorted_tiers:
|
|
|
|
|
|
if base_quantity >= Decimal(str(tier['min_quantity'])):
|
|
|
|
|
|
current_tier_price = Decimal(str(tier['unit_price']))
|
|
|
|
|
|
|
|
|
|
|
|
# Check if moving to next tier would save money
|
|
|
|
|
|
best_quantity = base_quantity
|
|
|
|
|
|
best_price = current_tier_price
|
|
|
|
|
|
best_savings = Decimal('0')
|
2025-11-07 19:00:00 +00:00
|
|
|
|
reasoning = f"TIER_PRICING:CURRENT_TIER|price={current_tier_price}"
|
2025-10-30 21:08:07 +01:00
|
|
|
|
|
|
|
|
|
|
for tier in sorted_tiers:
|
|
|
|
|
|
tier_min_qty = Decimal(str(tier['min_quantity']))
|
|
|
|
|
|
tier_price = Decimal(str(tier['unit_price']))
|
|
|
|
|
|
|
|
|
|
|
|
if tier_min_qty > base_quantity:
|
|
|
|
|
|
# Calculate cost at this tier
|
|
|
|
|
|
tier_cost = tier_min_qty * tier_price
|
|
|
|
|
|
|
|
|
|
|
|
# Calculate savings
|
|
|
|
|
|
savings = base_cost - tier_cost
|
|
|
|
|
|
|
|
|
|
|
|
if savings > best_savings:
|
|
|
|
|
|
# Additional quantity needed
|
|
|
|
|
|
additional_qty = tier_min_qty - base_quantity
|
|
|
|
|
|
|
|
|
|
|
|
# Check if savings justify additional inventory
|
|
|
|
|
|
# Simple heuristic: savings should be > 10% of additional cost
|
|
|
|
|
|
additional_cost = additional_qty * tier_price
|
|
|
|
|
|
if savings > additional_cost * Decimal('0.1'):
|
|
|
|
|
|
best_quantity = tier_min_qty
|
|
|
|
|
|
best_price = tier_price
|
|
|
|
|
|
best_savings = savings
|
2025-11-07 19:00:00 +00:00
|
|
|
|
reasoning = f"TIER_PRICING:UPGRADED|tier_min={tier_min_qty}|savings={savings:.2f}"
|
2025-10-30 21:08:07 +01:00
|
|
|
|
|
|
|
|
|
|
return best_quantity, best_price, reasoning
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aggregate_requirements_for_moq(
|
|
|
|
|
|
requirements: List[Dict],
|
|
|
|
|
|
moq: Decimal
|
|
|
|
|
|
) -> List[Dict]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Aggregate multiple requirements to meet MOQ efficiently.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
requirements: List of requirement dicts with 'quantity' and 'date'
|
|
|
|
|
|
moq: Minimum order quantity
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
List of aggregated orders
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not requirements:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
# Sort requirements by date
|
|
|
|
|
|
sorted_reqs = sorted(requirements, key=lambda x: x['date'])
|
|
|
|
|
|
|
|
|
|
|
|
orders = []
|
|
|
|
|
|
current_batch = []
|
|
|
|
|
|
current_total = Decimal('0')
|
|
|
|
|
|
|
|
|
|
|
|
for req in sorted_reqs:
|
|
|
|
|
|
req_qty = Decimal(str(req['quantity']))
|
|
|
|
|
|
|
|
|
|
|
|
# Check if adding this requirement would exceed reasonable aggregation
|
|
|
|
|
|
# (e.g., don't aggregate more than 30 days worth)
|
|
|
|
|
|
if current_batch:
|
|
|
|
|
|
days_span = (req['date'] - current_batch[0]['date']).days
|
|
|
|
|
|
if days_span > 30:
|
|
|
|
|
|
# Finalize current batch
|
|
|
|
|
|
if current_total > 0:
|
|
|
|
|
|
orders.append({
|
|
|
|
|
|
'quantity': round_to_moq(current_total, moq),
|
|
|
|
|
|
'date': current_batch[0]['date'],
|
|
|
|
|
|
'requirements': current_batch.copy()
|
|
|
|
|
|
})
|
|
|
|
|
|
current_batch = []
|
|
|
|
|
|
current_total = Decimal('0')
|
|
|
|
|
|
|
|
|
|
|
|
current_batch.append(req)
|
|
|
|
|
|
current_total += req_qty
|
|
|
|
|
|
|
|
|
|
|
|
# If we've met MOQ, finalize this batch
|
|
|
|
|
|
if current_total >= moq:
|
|
|
|
|
|
orders.append({
|
|
|
|
|
|
'quantity': round_to_moq(current_total, moq),
|
|
|
|
|
|
'date': current_batch[0]['date'],
|
|
|
|
|
|
'requirements': current_batch.copy()
|
|
|
|
|
|
})
|
|
|
|
|
|
current_batch = []
|
|
|
|
|
|
current_total = Decimal('0')
|
|
|
|
|
|
|
|
|
|
|
|
# Handle remaining requirements
|
|
|
|
|
|
if current_batch:
|
|
|
|
|
|
orders.append({
|
|
|
|
|
|
'quantity': round_to_moq(current_total, moq),
|
|
|
|
|
|
'date': current_batch[0]['date'],
|
|
|
|
|
|
'requirements': current_batch
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return orders
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_order_splitting(
|
|
|
|
|
|
total_quantity: Decimal,
|
|
|
|
|
|
suppliers: List[Dict],
|
|
|
|
|
|
max_supplier_capacity: Optional[Decimal] = None
|
|
|
|
|
|
) -> List[Dict]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Split large order across multiple suppliers.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
total_quantity: Total quantity needed
|
|
|
|
|
|
suppliers: List of supplier dicts with 'id', 'capacity', 'reliability'
|
|
|
|
|
|
max_supplier_capacity: Maximum any single supplier should provide
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
List of allocations with 'supplier_id' and 'quantity'
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not suppliers:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
# Sort suppliers by reliability (descending)
|
|
|
|
|
|
sorted_suppliers = sorted(
|
|
|
|
|
|
suppliers,
|
|
|
|
|
|
key=lambda x: x.get('reliability', 0.5),
|
|
|
|
|
|
reverse=True
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
allocations = []
|
|
|
|
|
|
remaining = total_quantity
|
|
|
|
|
|
|
|
|
|
|
|
for supplier in sorted_suppliers:
|
|
|
|
|
|
if remaining <= 0:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
supplier_capacity = Decimal(str(supplier.get('capacity', float('inf'))))
|
|
|
|
|
|
|
|
|
|
|
|
# Apply max capacity constraint
|
|
|
|
|
|
if max_supplier_capacity:
|
|
|
|
|
|
supplier_capacity = min(supplier_capacity, max_supplier_capacity)
|
|
|
|
|
|
|
|
|
|
|
|
# Allocate to this supplier
|
|
|
|
|
|
allocated = min(remaining, supplier_capacity)
|
|
|
|
|
|
|
|
|
|
|
|
allocations.append({
|
|
|
|
|
|
'supplier_id': supplier['id'],
|
|
|
|
|
|
'quantity': allocated,
|
|
|
|
|
|
'reliability': supplier.get('reliability', 0.5)
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
remaining -= allocated
|
|
|
|
|
|
|
|
|
|
|
|
# If still remaining, distribute across suppliers
|
|
|
|
|
|
if remaining > 0:
|
|
|
|
|
|
# Distribute remaining proportionally to reliability
|
|
|
|
|
|
total_reliability = sum(s.get('reliability', 0.5) for s in sorted_suppliers)
|
|
|
|
|
|
|
|
|
|
|
|
for i, supplier in enumerate(sorted_suppliers):
|
|
|
|
|
|
if total_reliability > 0:
|
|
|
|
|
|
proportion = supplier.get('reliability', 0.5) / total_reliability
|
|
|
|
|
|
additional = remaining * Decimal(str(proportion))
|
|
|
|
|
|
|
|
|
|
|
|
allocations[i]['quantity'] += additional
|
|
|
|
|
|
|
|
|
|
|
|
return allocations
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_buffer_stock(
|
|
|
|
|
|
lead_time_days: int,
|
|
|
|
|
|
daily_demand: float,
|
|
|
|
|
|
demand_variability: float,
|
|
|
|
|
|
service_level: float = 0.95
|
|
|
|
|
|
) -> Decimal:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Calculate buffer stock based on demand variability.
|
|
|
|
|
|
|
|
|
|
|
|
Buffer Stock = Z × σ × √(lead_time)
|
|
|
|
|
|
where:
|
|
|
|
|
|
- Z = service level z-score
|
|
|
|
|
|
- σ = demand standard deviation
|
|
|
|
|
|
- lead_time = lead time in days
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
lead_time_days: Supplier lead time in days
|
|
|
|
|
|
daily_demand: Average daily demand
|
|
|
|
|
|
demand_variability: Coefficient of variation (CV = σ/μ)
|
|
|
|
|
|
service_level: Target service level (0-1)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Buffer stock quantity
|
|
|
|
|
|
"""
|
|
|
|
|
|
if lead_time_days <= 0 or daily_demand <= 0:
|
|
|
|
|
|
return Decimal('0')
|
|
|
|
|
|
|
|
|
|
|
|
# Z-scores for common service levels
|
|
|
|
|
|
z_scores = {
|
|
|
|
|
|
0.90: 1.28,
|
|
|
|
|
|
0.95: 1.65,
|
|
|
|
|
|
0.975: 1.96,
|
|
|
|
|
|
0.99: 2.33,
|
|
|
|
|
|
0.995: 2.58
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Get z-score for service level
|
|
|
|
|
|
z_score = z_scores.get(service_level, 1.65) # Default to 95%
|
|
|
|
|
|
|
|
|
|
|
|
# Calculate standard deviation
|
|
|
|
|
|
stddev = daily_demand * demand_variability
|
|
|
|
|
|
|
|
|
|
|
|
# Buffer stock formula
|
|
|
|
|
|
buffer = z_score * stddev * math.sqrt(lead_time_days)
|
|
|
|
|
|
|
|
|
|
|
|
return Decimal(str(buffer))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_reorder_point(
|
|
|
|
|
|
daily_demand: float,
|
|
|
|
|
|
lead_time_days: int,
|
|
|
|
|
|
safety_stock: Decimal
|
|
|
|
|
|
) -> Decimal:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Calculate reorder point.
|
|
|
|
|
|
|
|
|
|
|
|
Reorder Point = (Daily Demand × Lead Time) + Safety Stock
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
daily_demand: Average daily demand
|
|
|
|
|
|
lead_time_days: Supplier lead time in days
|
|
|
|
|
|
safety_stock: Safety stock quantity
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Reorder point
|
|
|
|
|
|
"""
|
|
|
|
|
|
lead_time_demand = Decimal(str(daily_demand * lead_time_days))
|
|
|
|
|
|
return lead_time_demand + safety_stock
|