Improve the frontend 3
This commit is contained in:
438
shared/utils/optimization.py
Normal file
438
shared/utils/optimization.py
Normal file
@@ -0,0 +1,438 @@
|
||||
"""
|
||||
Optimization Utilities
|
||||
|
||||
Provides optimization algorithms for procurement planning including
|
||||
MOQ rounding, economic order quantity, and multi-objective optimization.
|
||||
"""
|
||||
|
||||
import math
|
||||
from decimal import Decimal
|
||||
from typing import List, Tuple, Dict, Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class OrderOptimizationResult:
|
||||
"""Result of order quantity optimization"""
|
||||
optimal_quantity: Decimal
|
||||
order_cost: Decimal
|
||||
holding_cost: Decimal
|
||||
total_cost: Decimal
|
||||
orders_per_year: float
|
||||
reasoning: str
|
||||
|
||||
|
||||
def calculate_economic_order_quantity(
|
||||
annual_demand: float,
|
||||
ordering_cost: float,
|
||||
holding_cost_per_unit: float
|
||||
) -> float:
|
||||
"""
|
||||
Calculate Economic Order Quantity (EOQ).
|
||||
|
||||
EOQ = sqrt((2 × D × S) / H)
|
||||
where:
|
||||
- D = Annual demand
|
||||
- S = Ordering cost per order
|
||||
- H = Holding cost per unit per year
|
||||
|
||||
Args:
|
||||
annual_demand: Annual demand in units
|
||||
ordering_cost: Cost per order placement
|
||||
holding_cost_per_unit: Annual holding cost per unit
|
||||
|
||||
Returns:
|
||||
Optimal order quantity
|
||||
"""
|
||||
if annual_demand <= 0 or ordering_cost <= 0 or holding_cost_per_unit <= 0:
|
||||
return 0.0
|
||||
|
||||
eoq = math.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
|
||||
return eoq
|
||||
|
||||
|
||||
def optimize_order_quantity(
|
||||
required_quantity: Decimal,
|
||||
annual_demand: float,
|
||||
ordering_cost: float = 50.0,
|
||||
holding_cost_rate: float = 0.25,
|
||||
unit_price: float = 1.0,
|
||||
min_order_qty: Optional[Decimal] = None,
|
||||
max_order_qty: Optional[Decimal] = None
|
||||
) -> OrderOptimizationResult:
|
||||
"""
|
||||
Optimize order quantity considering EOQ and constraints.
|
||||
|
||||
Args:
|
||||
required_quantity: Quantity needed for current period
|
||||
annual_demand: Estimated annual demand
|
||||
ordering_cost: Fixed cost per order
|
||||
holding_cost_rate: Annual holding cost as % of unit price
|
||||
unit_price: Cost per unit
|
||||
min_order_qty: Minimum order quantity (MOQ)
|
||||
max_order_qty: Maximum order quantity (storage limit)
|
||||
|
||||
Returns:
|
||||
OrderOptimizationResult with optimal quantity and costs
|
||||
"""
|
||||
holding_cost_per_unit = unit_price * holding_cost_rate
|
||||
|
||||
# Calculate EOQ
|
||||
eoq = calculate_economic_order_quantity(
|
||||
annual_demand,
|
||||
ordering_cost,
|
||||
holding_cost_per_unit
|
||||
)
|
||||
|
||||
# Start with EOQ or required quantity, whichever is larger
|
||||
optimal_qty = max(float(required_quantity), eoq)
|
||||
|
||||
reasoning = f"Base EOQ: {eoq:.2f}, Required: {required_quantity}"
|
||||
|
||||
# Apply minimum order quantity
|
||||
if min_order_qty and Decimal(optimal_qty) < min_order_qty:
|
||||
optimal_qty = float(min_order_qty)
|
||||
reasoning += f", Applied MOQ: {min_order_qty}"
|
||||
|
||||
# Apply maximum order quantity
|
||||
if max_order_qty and Decimal(optimal_qty) > max_order_qty:
|
||||
optimal_qty = float(max_order_qty)
|
||||
reasoning += f", Capped at max: {max_order_qty}"
|
||||
|
||||
# Calculate costs
|
||||
orders_per_year = annual_demand / optimal_qty if optimal_qty > 0 else 0
|
||||
annual_ordering_cost = orders_per_year * ordering_cost
|
||||
annual_holding_cost = (optimal_qty / 2) * holding_cost_per_unit
|
||||
total_annual_cost = annual_ordering_cost + annual_holding_cost
|
||||
|
||||
return OrderOptimizationResult(
|
||||
optimal_quantity=Decimal(str(optimal_qty)),
|
||||
order_cost=Decimal(str(annual_ordering_cost)),
|
||||
holding_cost=Decimal(str(annual_holding_cost)),
|
||||
total_cost=Decimal(str(total_annual_cost)),
|
||||
orders_per_year=orders_per_year,
|
||||
reasoning=reasoning
|
||||
)
|
||||
|
||||
|
||||
def round_to_moq(
|
||||
quantity: Decimal,
|
||||
moq: Decimal,
|
||||
round_up: bool = True
|
||||
) -> Decimal:
|
||||
"""
|
||||
Round quantity to meet minimum order quantity.
|
||||
|
||||
Args:
|
||||
quantity: Desired quantity
|
||||
moq: Minimum order quantity
|
||||
round_up: If True, always round up to next MOQ multiple
|
||||
|
||||
Returns:
|
||||
Rounded quantity
|
||||
"""
|
||||
if quantity <= 0 or moq <= 0:
|
||||
return quantity
|
||||
|
||||
if quantity < moq:
|
||||
return moq
|
||||
|
||||
# Calculate how many MOQs needed
|
||||
multiples = quantity / moq
|
||||
|
||||
if round_up:
|
||||
return Decimal(math.ceil(float(multiples))) * moq
|
||||
else:
|
||||
return Decimal(round(float(multiples))) * moq
|
||||
|
||||
|
||||
def round_to_package_size(
|
||||
quantity: Decimal,
|
||||
package_size: Decimal,
|
||||
allow_partial: bool = False
|
||||
) -> Decimal:
|
||||
"""
|
||||
Round quantity to package size.
|
||||
|
||||
Args:
|
||||
quantity: Desired quantity
|
||||
package_size: Size of one package
|
||||
allow_partial: If False, always round up to full packages
|
||||
|
||||
Returns:
|
||||
Rounded quantity
|
||||
"""
|
||||
if quantity <= 0 or package_size <= 0:
|
||||
return quantity
|
||||
|
||||
packages_needed = quantity / package_size
|
||||
|
||||
if allow_partial:
|
||||
return quantity
|
||||
else:
|
||||
return Decimal(math.ceil(float(packages_needed))) * package_size
|
||||
|
||||
|
||||
def apply_price_tier_optimization(
|
||||
base_quantity: Decimal,
|
||||
unit_price: Decimal,
|
||||
price_tiers: List[Dict]
|
||||
) -> Tuple[Decimal, Decimal, str]:
|
||||
"""
|
||||
Optimize quantity to take advantage of price tiers.
|
||||
|
||||
Args:
|
||||
base_quantity: Base quantity needed
|
||||
unit_price: Current unit price
|
||||
price_tiers: List of dicts with 'min_quantity' and 'unit_price'
|
||||
|
||||
Returns:
|
||||
Tuple of (optimized_quantity, unit_price, reasoning)
|
||||
"""
|
||||
if not price_tiers:
|
||||
return base_quantity, unit_price, "No price tiers available"
|
||||
|
||||
# Sort tiers by min_quantity
|
||||
sorted_tiers = sorted(price_tiers, key=lambda x: x['min_quantity'])
|
||||
|
||||
# Calculate cost at base quantity
|
||||
base_cost = base_quantity * unit_price
|
||||
|
||||
# Find current tier
|
||||
current_tier_price = unit_price
|
||||
for tier in sorted_tiers:
|
||||
if base_quantity >= Decimal(str(tier['min_quantity'])):
|
||||
current_tier_price = Decimal(str(tier['unit_price']))
|
||||
|
||||
# Check if moving to next tier would save money
|
||||
best_quantity = base_quantity
|
||||
best_price = current_tier_price
|
||||
best_savings = Decimal('0')
|
||||
reasoning = f"Current tier price: ${current_tier_price}"
|
||||
|
||||
for tier in sorted_tiers:
|
||||
tier_min_qty = Decimal(str(tier['min_quantity']))
|
||||
tier_price = Decimal(str(tier['unit_price']))
|
||||
|
||||
if tier_min_qty > base_quantity:
|
||||
# Calculate cost at this tier
|
||||
tier_cost = tier_min_qty * tier_price
|
||||
|
||||
# Calculate savings
|
||||
savings = base_cost - tier_cost
|
||||
|
||||
if savings > best_savings:
|
||||
# Additional quantity needed
|
||||
additional_qty = tier_min_qty - base_quantity
|
||||
|
||||
# Check if savings justify additional inventory
|
||||
# Simple heuristic: savings should be > 10% of additional cost
|
||||
additional_cost = additional_qty * tier_price
|
||||
if savings > additional_cost * Decimal('0.1'):
|
||||
best_quantity = tier_min_qty
|
||||
best_price = tier_price
|
||||
best_savings = savings
|
||||
reasoning = f"Upgraded to tier {tier_min_qty}+ for ${savings:.2f} savings"
|
||||
|
||||
return best_quantity, best_price, reasoning
|
||||
|
||||
|
||||
def aggregate_requirements_for_moq(
|
||||
requirements: List[Dict],
|
||||
moq: Decimal
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Aggregate multiple requirements to meet MOQ efficiently.
|
||||
|
||||
Args:
|
||||
requirements: List of requirement dicts with 'quantity' and 'date'
|
||||
moq: Minimum order quantity
|
||||
|
||||
Returns:
|
||||
List of aggregated orders
|
||||
"""
|
||||
if not requirements:
|
||||
return []
|
||||
|
||||
# Sort requirements by date
|
||||
sorted_reqs = sorted(requirements, key=lambda x: x['date'])
|
||||
|
||||
orders = []
|
||||
current_batch = []
|
||||
current_total = Decimal('0')
|
||||
|
||||
for req in sorted_reqs:
|
||||
req_qty = Decimal(str(req['quantity']))
|
||||
|
||||
# Check if adding this requirement would exceed reasonable aggregation
|
||||
# (e.g., don't aggregate more than 30 days worth)
|
||||
if current_batch:
|
||||
days_span = (req['date'] - current_batch[0]['date']).days
|
||||
if days_span > 30:
|
||||
# Finalize current batch
|
||||
if current_total > 0:
|
||||
orders.append({
|
||||
'quantity': round_to_moq(current_total, moq),
|
||||
'date': current_batch[0]['date'],
|
||||
'requirements': current_batch.copy()
|
||||
})
|
||||
current_batch = []
|
||||
current_total = Decimal('0')
|
||||
|
||||
current_batch.append(req)
|
||||
current_total += req_qty
|
||||
|
||||
# If we've met MOQ, finalize this batch
|
||||
if current_total >= moq:
|
||||
orders.append({
|
||||
'quantity': round_to_moq(current_total, moq),
|
||||
'date': current_batch[0]['date'],
|
||||
'requirements': current_batch.copy()
|
||||
})
|
||||
current_batch = []
|
||||
current_total = Decimal('0')
|
||||
|
||||
# Handle remaining requirements
|
||||
if current_batch:
|
||||
orders.append({
|
||||
'quantity': round_to_moq(current_total, moq),
|
||||
'date': current_batch[0]['date'],
|
||||
'requirements': current_batch
|
||||
})
|
||||
|
||||
return orders
|
||||
|
||||
|
||||
def calculate_order_splitting(
|
||||
total_quantity: Decimal,
|
||||
suppliers: List[Dict],
|
||||
max_supplier_capacity: Optional[Decimal] = None
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Split large order across multiple suppliers.
|
||||
|
||||
Args:
|
||||
total_quantity: Total quantity needed
|
||||
suppliers: List of supplier dicts with 'id', 'capacity', 'reliability'
|
||||
max_supplier_capacity: Maximum any single supplier should provide
|
||||
|
||||
Returns:
|
||||
List of allocations with 'supplier_id' and 'quantity'
|
||||
"""
|
||||
if not suppliers:
|
||||
return []
|
||||
|
||||
# Sort suppliers by reliability (descending)
|
||||
sorted_suppliers = sorted(
|
||||
suppliers,
|
||||
key=lambda x: x.get('reliability', 0.5),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
allocations = []
|
||||
remaining = total_quantity
|
||||
|
||||
for supplier in sorted_suppliers:
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
supplier_capacity = Decimal(str(supplier.get('capacity', float('inf'))))
|
||||
|
||||
# Apply max capacity constraint
|
||||
if max_supplier_capacity:
|
||||
supplier_capacity = min(supplier_capacity, max_supplier_capacity)
|
||||
|
||||
# Allocate to this supplier
|
||||
allocated = min(remaining, supplier_capacity)
|
||||
|
||||
allocations.append({
|
||||
'supplier_id': supplier['id'],
|
||||
'quantity': allocated,
|
||||
'reliability': supplier.get('reliability', 0.5)
|
||||
})
|
||||
|
||||
remaining -= allocated
|
||||
|
||||
# If still remaining, distribute across suppliers
|
||||
if remaining > 0:
|
||||
# Distribute remaining proportionally to reliability
|
||||
total_reliability = sum(s.get('reliability', 0.5) for s in sorted_suppliers)
|
||||
|
||||
for i, supplier in enumerate(sorted_suppliers):
|
||||
if total_reliability > 0:
|
||||
proportion = supplier.get('reliability', 0.5) / total_reliability
|
||||
additional = remaining * Decimal(str(proportion))
|
||||
|
||||
allocations[i]['quantity'] += additional
|
||||
|
||||
return allocations
|
||||
|
||||
|
||||
def calculate_buffer_stock(
|
||||
lead_time_days: int,
|
||||
daily_demand: float,
|
||||
demand_variability: float,
|
||||
service_level: float = 0.95
|
||||
) -> Decimal:
|
||||
"""
|
||||
Calculate buffer stock based on demand variability.
|
||||
|
||||
Buffer Stock = Z × σ × √(lead_time)
|
||||
where:
|
||||
- Z = service level z-score
|
||||
- σ = demand standard deviation
|
||||
- lead_time = lead time in days
|
||||
|
||||
Args:
|
||||
lead_time_days: Supplier lead time in days
|
||||
daily_demand: Average daily demand
|
||||
demand_variability: Coefficient of variation (CV = σ/μ)
|
||||
service_level: Target service level (0-1)
|
||||
|
||||
Returns:
|
||||
Buffer stock quantity
|
||||
"""
|
||||
if lead_time_days <= 0 or daily_demand <= 0:
|
||||
return Decimal('0')
|
||||
|
||||
# Z-scores for common service levels
|
||||
z_scores = {
|
||||
0.90: 1.28,
|
||||
0.95: 1.65,
|
||||
0.975: 1.96,
|
||||
0.99: 2.33,
|
||||
0.995: 2.58
|
||||
}
|
||||
|
||||
# Get z-score for service level
|
||||
z_score = z_scores.get(service_level, 1.65) # Default to 95%
|
||||
|
||||
# Calculate standard deviation
|
||||
stddev = daily_demand * demand_variability
|
||||
|
||||
# Buffer stock formula
|
||||
buffer = z_score * stddev * math.sqrt(lead_time_days)
|
||||
|
||||
return Decimal(str(buffer))
|
||||
|
||||
|
||||
def calculate_reorder_point(
|
||||
daily_demand: float,
|
||||
lead_time_days: int,
|
||||
safety_stock: Decimal
|
||||
) -> Decimal:
|
||||
"""
|
||||
Calculate reorder point.
|
||||
|
||||
Reorder Point = (Daily Demand × Lead Time) + Safety Stock
|
||||
|
||||
Args:
|
||||
daily_demand: Average daily demand
|
||||
lead_time_days: Supplier lead time in days
|
||||
safety_stock: Safety stock quantity
|
||||
|
||||
Returns:
|
||||
Reorder point
|
||||
"""
|
||||
lead_time_demand = Decimal(str(daily_demand * lead_time_days))
|
||||
return lead_time_demand + safety_stock
|
||||
Reference in New Issue
Block a user