Files
bakery-ia/services/procurement/app/services/moq_aggregator.py
2025-10-30 21:08:07 +01:00

459 lines
15 KiB
Python

"""
MOQ Aggregator
Aggregates multiple procurement requirements to meet Minimum Order Quantities (MOQ)
and optimize order sizes.
"""
from datetime import date, timedelta
from decimal import Decimal
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import logging
from shared.utils.optimization import (
round_to_moq,
round_to_package_size,
aggregate_requirements_for_moq
)
logger = logging.getLogger(__name__)
@dataclass
class ProcurementRequirement:
"""Single procurement requirement"""
id: str
ingredient_id: str
ingredient_name: str
quantity: Decimal
required_date: date
supplier_id: str
unit_of_measure: str
@dataclass
class SupplierConstraints:
"""Supplier ordering constraints"""
supplier_id: str
supplier_name: str
min_order_quantity: Optional[Decimal] = None
min_order_value: Optional[Decimal] = None
package_size: Optional[Decimal] = None
max_order_quantity: Optional[Decimal] = None
economic_order_multiple: Optional[Decimal] = None
@dataclass
class AggregatedOrder:
"""Aggregated order for a supplier"""
id: str
supplier_id: str
ingredient_id: str
ingredient_name: str
aggregated_quantity: Decimal
original_quantity: Decimal
order_date: date
unit_of_measure: str
requirements: List[ProcurementRequirement]
adjustment_reason: str
moq_applied: bool
package_rounding_applied: bool
class MOQAggregator:
"""
Aggregates procurement requirements to meet MOQ constraints.
Strategies:
1. Combine multiple requirements for same ingredient
2. Round up to meet MOQ
3. Round to package sizes
4. Consolidate orders within time window
5. Optimize order timing
"""
def __init__(
self,
consolidation_window_days: int = 7,
allow_early_ordering: bool = True
):
"""
Initialize MOQ aggregator.
Args:
consolidation_window_days: Days within which to consolidate orders
allow_early_ordering: Whether to allow ordering early to meet MOQ
"""
self.consolidation_window_days = consolidation_window_days
self.allow_early_ordering = allow_early_ordering
def aggregate_requirements(
self,
requirements: List[ProcurementRequirement],
supplier_constraints: Dict[str, SupplierConstraints]
) -> List[AggregatedOrder]:
"""
Aggregate requirements to meet MOQ constraints.
Args:
requirements: List of procurement requirements
supplier_constraints: Dictionary of supplier constraints by supplier_id
Returns:
List of aggregated orders
"""
if not requirements:
return []
logger.info(f"Aggregating {len(requirements)} procurement requirements")
# Group requirements by supplier and ingredient
grouped = self._group_requirements(requirements)
aggregated_orders = []
for (supplier_id, ingredient_id), reqs in grouped.items():
constraints = supplier_constraints.get(supplier_id)
if not constraints:
logger.warning(
f"No constraints found for supplier {supplier_id}, "
f"processing without MOQ"
)
constraints = SupplierConstraints(
supplier_id=supplier_id,
supplier_name=f"Supplier {supplier_id}"
)
# Aggregate this group
orders = self._aggregate_ingredient_requirements(
reqs,
constraints
)
aggregated_orders.extend(orders)
logger.info(
f"Created {len(aggregated_orders)} aggregated orders "
f"from {len(requirements)} requirements"
)
return aggregated_orders
def _group_requirements(
self,
requirements: List[ProcurementRequirement]
) -> Dict[Tuple[str, str], List[ProcurementRequirement]]:
"""
Group requirements by supplier and ingredient.
Args:
requirements: List of requirements
Returns:
Dictionary mapping (supplier_id, ingredient_id) to list of requirements
"""
grouped: Dict[Tuple[str, str], List[ProcurementRequirement]] = {}
for req in requirements:
key = (req.supplier_id, req.ingredient_id)
if key not in grouped:
grouped[key] = []
grouped[key].append(req)
return grouped
def _aggregate_ingredient_requirements(
self,
requirements: List[ProcurementRequirement],
constraints: SupplierConstraints
) -> List[AggregatedOrder]:
"""
Aggregate requirements for one ingredient from one supplier.
Args:
requirements: List of requirements for same ingredient/supplier
constraints: Supplier constraints
Returns:
List of aggregated orders
"""
if not requirements:
return []
# Sort by required date
sorted_reqs = sorted(requirements, key=lambda r: r.required_date)
# Try to consolidate within time window
batches = self._consolidate_by_time_window(sorted_reqs)
orders = []
for batch in batches:
order = self._create_aggregated_order(batch, constraints)
orders.append(order)
return orders
def _consolidate_by_time_window(
self,
requirements: List[ProcurementRequirement]
) -> List[List[ProcurementRequirement]]:
"""
Consolidate requirements within time window.
Args:
requirements: Sorted list of requirements
Returns:
List of requirement batches
"""
if not requirements:
return []
batches = []
current_batch = [requirements[0]]
batch_start_date = requirements[0].required_date
for req in requirements[1:]:
days_diff = (req.required_date - batch_start_date).days
if days_diff <= self.consolidation_window_days:
# Within window, add to current batch
current_batch.append(req)
else:
# Outside window, start new batch
batches.append(current_batch)
current_batch = [req]
batch_start_date = req.required_date
# Add final batch
if current_batch:
batches.append(current_batch)
return batches
def _create_aggregated_order(
self,
requirements: List[ProcurementRequirement],
constraints: SupplierConstraints
) -> AggregatedOrder:
"""
Create aggregated order from requirements.
Args:
requirements: List of requirements to aggregate
constraints: Supplier constraints
Returns:
Aggregated order
"""
# Sum quantities
total_quantity = sum(req.quantity for req in requirements)
original_quantity = total_quantity
# Get earliest required date
order_date = min(req.required_date for req in requirements)
# Get ingredient info from first requirement
first_req = requirements[0]
ingredient_id = first_req.ingredient_id
ingredient_name = first_req.ingredient_name
unit_of_measure = first_req.unit_of_measure
# Apply constraints
adjustment_reason = []
moq_applied = False
package_rounding_applied = False
# 1. Check MOQ
if constraints.min_order_quantity:
if total_quantity < constraints.min_order_quantity:
total_quantity = constraints.min_order_quantity
moq_applied = True
adjustment_reason.append(
f"Rounded up to MOQ: {constraints.min_order_quantity} {unit_of_measure}"
)
# 2. Check package size
if constraints.package_size:
rounded_qty = round_to_package_size(
total_quantity,
constraints.package_size,
allow_partial=False
)
if rounded_qty != total_quantity:
total_quantity = rounded_qty
package_rounding_applied = True
adjustment_reason.append(
f"Rounded to package size: {constraints.package_size} {unit_of_measure}"
)
# 3. Check max order quantity
if constraints.max_order_quantity:
if total_quantity > constraints.max_order_quantity:
logger.warning(
f"{ingredient_name}: Order quantity {total_quantity} exceeds "
f"max {constraints.max_order_quantity}, capping"
)
total_quantity = constraints.max_order_quantity
adjustment_reason.append(
f"Capped at maximum: {constraints.max_order_quantity} {unit_of_measure}"
)
# 4. Apply economic order multiple
if constraints.economic_order_multiple:
multiple = constraints.economic_order_multiple
rounded = round_to_moq(total_quantity, multiple, round_up=True)
if rounded != total_quantity:
total_quantity = rounded
adjustment_reason.append(
f"Rounded to economic multiple: {multiple} {unit_of_measure}"
)
# Create aggregated order
order = AggregatedOrder(
id=f"agg_{requirements[0].id}",
supplier_id=constraints.supplier_id,
ingredient_id=ingredient_id,
ingredient_name=ingredient_name,
aggregated_quantity=total_quantity,
original_quantity=original_quantity,
order_date=order_date,
unit_of_measure=unit_of_measure,
requirements=requirements,
adjustment_reason=" | ".join(adjustment_reason) if adjustment_reason else "No adjustments",
moq_applied=moq_applied,
package_rounding_applied=package_rounding_applied
)
if total_quantity != original_quantity:
logger.info(
f"{ingredient_name}: Aggregated {len(requirements)} requirements "
f"({original_quantity}{total_quantity} {unit_of_measure})"
)
return order
def calculate_order_efficiency(
self,
orders: List[AggregatedOrder]
) -> Dict:
"""
Calculate efficiency metrics for aggregated orders.
Args:
orders: List of aggregated orders
Returns:
Efficiency metrics
"""
total_orders = len(orders)
total_requirements = sum(len(order.requirements) for order in orders)
orders_with_moq = sum(1 for order in orders if order.moq_applied)
orders_with_rounding = sum(1 for order in orders if order.package_rounding_applied)
total_original_qty = sum(order.original_quantity for order in orders)
total_aggregated_qty = sum(order.aggregated_quantity for order in orders)
overhead_qty = total_aggregated_qty - total_original_qty
overhead_percentage = (
(overhead_qty / total_original_qty * 100)
if total_original_qty > 0 else 0
)
consolidation_ratio = (
total_requirements / total_orders
if total_orders > 0 else 0
)
return {
'total_orders': total_orders,
'total_requirements': total_requirements,
'consolidation_ratio': float(consolidation_ratio),
'orders_with_moq_adjustment': orders_with_moq,
'orders_with_package_rounding': orders_with_rounding,
'total_original_quantity': float(total_original_qty),
'total_aggregated_quantity': float(total_aggregated_qty),
'overhead_quantity': float(overhead_qty),
'overhead_percentage': float(overhead_percentage)
}
def split_oversized_order(
self,
order: AggregatedOrder,
max_quantity: Decimal,
split_window_days: int = 7
) -> List[AggregatedOrder]:
"""
Split an oversized order into multiple smaller orders.
Args:
order: Order to split
max_quantity: Maximum quantity per order
split_window_days: Days between split orders
Returns:
List of split orders
"""
if order.aggregated_quantity <= max_quantity:
return [order]
logger.info(
f"Splitting oversized order: {order.aggregated_quantity} > {max_quantity}"
)
num_splits = int((order.aggregated_quantity / max_quantity)) + 1
qty_per_order = order.aggregated_quantity / Decimal(str(num_splits))
split_orders = []
for i in range(num_splits):
split_date = order.order_date + timedelta(days=i * split_window_days)
split_order = AggregatedOrder(
id=f"{order.id}_split_{i+1}",
supplier_id=order.supplier_id,
ingredient_id=order.ingredient_id,
ingredient_name=order.ingredient_name,
aggregated_quantity=qty_per_order,
original_quantity=order.original_quantity / Decimal(str(num_splits)),
order_date=split_date,
unit_of_measure=order.unit_of_measure,
requirements=order.requirements, # Share requirements
adjustment_reason=f"Split {i+1}/{num_splits} due to capacity constraint",
moq_applied=order.moq_applied,
package_rounding_applied=order.package_rounding_applied
)
split_orders.append(split_order)
return split_orders
def export_to_dict(self, order: AggregatedOrder) -> Dict:
"""
Export aggregated order to dictionary.
Args:
order: Aggregated order
Returns:
Dictionary representation
"""
return {
'id': order.id,
'supplier_id': order.supplier_id,
'ingredient_id': order.ingredient_id,
'ingredient_name': order.ingredient_name,
'aggregated_quantity': float(order.aggregated_quantity),
'original_quantity': float(order.original_quantity),
'order_date': order.order_date.isoformat(),
'unit_of_measure': order.unit_of_measure,
'num_requirements_aggregated': len(order.requirements),
'adjustment_reason': order.adjustment_reason,
'moq_applied': order.moq_applied,
'package_rounding_applied': order.package_rounding_applied
}