Files
bakery-ia/shared/utils/optimization.py
2025-12-13 23:57:54 +01:00

481 lines
14 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Optimization Utilities
Provides optimization algorithms for procurement planning including
MOQ rounding, economic order quantity, and multi-objective optimization.
"""
import math
from decimal import Decimal
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
@dataclass
class OrderOptimizationResult:
"""Result of order quantity optimization"""
optimal_quantity: Decimal
order_cost: Decimal
holding_cost: Decimal
total_cost: Decimal
orders_per_year: float
reasoning_data: dict
def calculate_economic_order_quantity(
annual_demand: float,
ordering_cost: float,
holding_cost_per_unit: float
) -> float:
"""
Calculate Economic Order Quantity (EOQ).
EOQ = sqrt((2 × D × S) / H)
where:
- D = Annual demand
- S = Ordering cost per order
- H = Holding cost per unit per year
Args:
annual_demand: Annual demand in units
ordering_cost: Cost per order placement
holding_cost_per_unit: Annual holding cost per unit
Returns:
Optimal order quantity
"""
if annual_demand <= 0 or ordering_cost <= 0 or holding_cost_per_unit <= 0:
return 0.0
eoq = math.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
return eoq
def optimize_order_quantity(
required_quantity: Decimal,
annual_demand: float,
ordering_cost: float = 50.0,
holding_cost_rate: float = 0.25,
unit_price: float = 1.0,
min_order_qty: Optional[Decimal] = None,
max_order_qty: Optional[Decimal] = None
) -> OrderOptimizationResult:
"""
Optimize order quantity considering EOQ and constraints.
Args:
required_quantity: Quantity needed for current period
annual_demand: Estimated annual demand
ordering_cost: Fixed cost per order
holding_cost_rate: Annual holding cost as % of unit price
unit_price: Cost per unit
min_order_qty: Minimum order quantity (MOQ)
max_order_qty: Maximum order quantity (storage limit)
Returns:
OrderOptimizationResult with optimal quantity and costs
"""
holding_cost_per_unit = unit_price * holding_cost_rate
# Calculate EOQ
eoq = calculate_economic_order_quantity(
annual_demand,
ordering_cost,
holding_cost_per_unit
)
# Start with EOQ or required quantity, whichever is larger
optimal_qty = max(float(required_quantity), eoq)
# Build structured reasoning data
reasoning_data = {
'type': 'eoq_base',
'parameters': {
'eoq': round(eoq, 2),
'required_quantity': float(required_quantity),
'annual_demand': round(annual_demand, 2),
'ordering_cost': ordering_cost,
'holding_cost_rate': holding_cost_rate
},
'constraints_applied': []
}
# Apply minimum order quantity
if min_order_qty and Decimal(optimal_qty) < min_order_qty:
optimal_qty = float(min_order_qty)
reasoning_data['constraints_applied'].append({
'type': 'moq_applied',
'moq': float(min_order_qty)
})
# Apply maximum order quantity
if max_order_qty and Decimal(optimal_qty) > max_order_qty:
optimal_qty = float(max_order_qty)
reasoning_data['constraints_applied'].append({
'type': 'max_applied',
'max_qty': float(max_order_qty)
})
reasoning_data['parameters']['optimal_quantity'] = round(optimal_qty, 2)
# Calculate costs
orders_per_year = annual_demand / optimal_qty if optimal_qty > 0 else 0
annual_ordering_cost = orders_per_year * ordering_cost
annual_holding_cost = (optimal_qty / 2) * holding_cost_per_unit
total_annual_cost = annual_ordering_cost + annual_holding_cost
return OrderOptimizationResult(
optimal_quantity=Decimal(str(optimal_qty)),
order_cost=Decimal(str(annual_ordering_cost)),
holding_cost=Decimal(str(annual_holding_cost)),
total_cost=Decimal(str(total_annual_cost)),
orders_per_year=orders_per_year,
reasoning_data=reasoning_data
)
def round_to_moq(
quantity: Decimal,
moq: Decimal,
round_up: bool = True
) -> Decimal:
"""
Round quantity to meet minimum order quantity.
Args:
quantity: Desired quantity
moq: Minimum order quantity
round_up: If True, always round up to next MOQ multiple
Returns:
Rounded quantity
"""
if quantity <= 0 or moq <= 0:
return quantity
if quantity < moq:
return moq
# Calculate how many MOQs needed
multiples = quantity / moq
if round_up:
return Decimal(math.ceil(float(multiples))) * moq
else:
return Decimal(round(float(multiples))) * moq
def round_to_package_size(
quantity: Decimal,
package_size: Decimal,
allow_partial: bool = False
) -> Decimal:
"""
Round quantity to package size.
Args:
quantity: Desired quantity
package_size: Size of one package
allow_partial: If False, always round up to full packages
Returns:
Rounded quantity
"""
if quantity <= 0 or package_size <= 0:
return quantity
packages_needed = quantity / package_size
if allow_partial:
return quantity
else:
return Decimal(math.ceil(float(packages_needed))) * package_size
def apply_price_tier_optimization(
base_quantity: Decimal,
unit_price: Decimal,
price_tiers: List[Dict]
) -> Tuple[Decimal, Decimal, dict]:
"""
Optimize quantity to take advantage of price tiers.
Args:
base_quantity: Base quantity needed
unit_price: Current unit price
price_tiers: List of dicts with 'min_quantity' and 'unit_price'
Returns:
Tuple of (optimized_quantity, unit_price, reasoning_data)
"""
if not price_tiers:
return base_quantity, unit_price, {
'type': 'no_tiers',
'parameters': {
'base_quantity': float(base_quantity),
'unit_price': float(unit_price)
}
}
# Sort tiers by min_quantity
sorted_tiers = sorted(price_tiers, key=lambda x: x['min_quantity'])
# Calculate cost at base quantity
base_cost = base_quantity * unit_price
# Find current tier
current_tier_price = unit_price
for tier in sorted_tiers:
if base_quantity >= Decimal(str(tier['min_quantity'])):
current_tier_price = Decimal(str(tier['unit_price']))
# Check if moving to next tier would save money
best_quantity = base_quantity
best_price = current_tier_price
best_savings = Decimal('0')
reasoning_data = {
'type': 'current_tier',
'parameters': {
'base_quantity': float(base_quantity),
'current_tier_price': float(current_tier_price),
'base_cost': float(base_cost)
}
}
for tier in sorted_tiers:
tier_min_qty = Decimal(str(tier['min_quantity']))
tier_price = Decimal(str(tier['unit_price']))
if tier_min_qty > base_quantity:
# Calculate cost at this tier
tier_cost = tier_min_qty * tier_price
# Calculate savings
savings = base_cost - tier_cost
if savings > best_savings:
# Additional quantity needed
additional_qty = tier_min_qty - base_quantity
# Check if savings justify additional inventory
# Simple heuristic: savings should be > 10% of additional cost
additional_cost = additional_qty * tier_price
if savings > additional_cost * Decimal('0.1'):
best_quantity = tier_min_qty
best_price = tier_price
best_savings = savings
reasoning_data = {
'type': 'tier_upgraded',
'parameters': {
'base_quantity': float(base_quantity),
'tier_min_qty': float(tier_min_qty),
'base_price': float(current_tier_price),
'tier_price': float(tier_price),
'savings': round(float(savings), 2),
'additional_qty': float(additional_qty)
}
}
return best_quantity, best_price, reasoning_data
def aggregate_requirements_for_moq(
requirements: List[Dict],
moq: Decimal
) -> List[Dict]:
"""
Aggregate multiple requirements to meet MOQ efficiently.
Args:
requirements: List of requirement dicts with 'quantity' and 'date'
moq: Minimum order quantity
Returns:
List of aggregated orders
"""
if not requirements:
return []
# Sort requirements by date
sorted_reqs = sorted(requirements, key=lambda x: x['date'])
orders = []
current_batch = []
current_total = Decimal('0')
for req in sorted_reqs:
req_qty = Decimal(str(req['quantity']))
# Check if adding this requirement would exceed reasonable aggregation
# (e.g., don't aggregate more than 30 days worth)
if current_batch:
days_span = (req['date'] - current_batch[0]['date']).days
if days_span > 30:
# Finalize current batch
if current_total > 0:
orders.append({
'quantity': round_to_moq(current_total, moq),
'date': current_batch[0]['date'],
'requirements': current_batch.copy()
})
current_batch = []
current_total = Decimal('0')
current_batch.append(req)
current_total += req_qty
# If we've met MOQ, finalize this batch
if current_total >= moq:
orders.append({
'quantity': round_to_moq(current_total, moq),
'date': current_batch[0]['date'],
'requirements': current_batch.copy()
})
current_batch = []
current_total = Decimal('0')
# Handle remaining requirements
if current_batch:
orders.append({
'quantity': round_to_moq(current_total, moq),
'date': current_batch[0]['date'],
'requirements': current_batch
})
return orders
def calculate_order_splitting(
total_quantity: Decimal,
suppliers: List[Dict],
max_supplier_capacity: Optional[Decimal] = None
) -> List[Dict]:
"""
Split large order across multiple suppliers.
Args:
total_quantity: Total quantity needed
suppliers: List of supplier dicts with 'id', 'capacity', 'reliability'
max_supplier_capacity: Maximum any single supplier should provide
Returns:
List of allocations with 'supplier_id' and 'quantity'
"""
if not suppliers:
return []
# Sort suppliers by reliability (descending)
sorted_suppliers = sorted(
suppliers,
key=lambda x: x.get('reliability', 0.5),
reverse=True
)
allocations = []
remaining = total_quantity
for supplier in sorted_suppliers:
if remaining <= 0:
break
supplier_capacity = Decimal(str(supplier.get('capacity', float('inf'))))
# Apply max capacity constraint
if max_supplier_capacity:
supplier_capacity = min(supplier_capacity, max_supplier_capacity)
# Allocate to this supplier
allocated = min(remaining, supplier_capacity)
allocations.append({
'supplier_id': supplier['id'],
'quantity': allocated,
'reliability': supplier.get('reliability', 0.5)
})
remaining -= allocated
# If still remaining, distribute across suppliers
if remaining > 0:
# Distribute remaining proportionally to reliability
total_reliability = sum(s.get('reliability', 0.5) for s in sorted_suppliers)
for i, supplier in enumerate(sorted_suppliers):
if total_reliability > 0:
proportion = supplier.get('reliability', 0.5) / total_reliability
additional = remaining * Decimal(str(proportion))
allocations[i]['quantity'] += additional
return allocations
def calculate_buffer_stock(
lead_time_days: int,
daily_demand: float,
demand_variability: float,
service_level: float = 0.95
) -> Decimal:
"""
Calculate buffer stock based on demand variability.
Buffer Stock = Z × σ × √(lead_time)
where:
- Z = service level z-score
- σ = demand standard deviation
- lead_time = lead time in days
Args:
lead_time_days: Supplier lead time in days
daily_demand: Average daily demand
demand_variability: Coefficient of variation (CV = σ/μ)
service_level: Target service level (0-1)
Returns:
Buffer stock quantity
"""
if lead_time_days <= 0 or daily_demand <= 0:
return Decimal('0')
# Z-scores for common service levels
z_scores = {
0.90: 1.28,
0.95: 1.65,
0.975: 1.96,
0.99: 2.33,
0.995: 2.58
}
# Get z-score for service level
z_score = z_scores.get(service_level, 1.65) # Default to 95%
# Calculate standard deviation
stddev = daily_demand * demand_variability
# Buffer stock formula
buffer = z_score * stddev * math.sqrt(lead_time_days)
return Decimal(str(buffer))
def calculate_reorder_point(
daily_demand: float,
lead_time_days: int,
safety_stock: Decimal
) -> Decimal:
"""
Calculate reorder point.
Reorder Point = (Daily Demand × Lead Time) + Safety Stock
Args:
daily_demand: Average daily demand
lead_time_days: Supplier lead time in days
safety_stock: Safety stock quantity
Returns:
Reorder point
"""
lead_time_demand = Decimal(str(daily_demand * lead_time_days))
return lead_time_demand + safety_stock