Files
bakery-ia/shared/clients/procurement_client.py
2025-10-30 21:08:07 +01:00

487 lines
17 KiB
Python

"""
Procurement Service Client - ENHANCED VERSION
Adds support for advanced replenishment planning endpoints
NEW METHODS:
- generate_replenishment_plan()
- get_replenishment_plan()
- list_replenishment_plans()
- get_inventory_projections()
- calculate_safety_stock()
- evaluate_supplier_selection()
"""
import structlog
from typing import Dict, Any, Optional, List
from uuid import UUID
from datetime import date
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
class ProcurementServiceClient(BaseServiceClient):
"""Enhanced client for communicating with the Procurement Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("procurement", config)
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# ORIGINAL PROCUREMENT PLANNING (Kept for backward compatibility)
# ================================================================
async def auto_generate_procurement(
self,
tenant_id: str,
forecast_data: Dict[str, Any],
production_schedule_id: Optional[str] = None,
target_date: Optional[str] = None,
auto_create_pos: bool = False,
auto_approve_pos: bool = False,
inventory_data: Optional[Dict[str, Any]] = None,
suppliers_data: Optional[Dict[str, Any]] = None,
recipes_data: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
"""
Auto-generate procurement plan from forecast data (called by orchestrator)
NOW USES ENHANCED PLANNING INTERNALLY
Args:
tenant_id: Tenant ID
forecast_data: Forecast data
production_schedule_id: Optional production schedule ID
target_date: Optional target date
auto_create_pos: Auto-create purchase orders
auto_approve_pos: Auto-approve purchase orders
inventory_data: Optional inventory snapshot (NEW - to avoid duplicate fetching)
suppliers_data: Optional suppliers snapshot (NEW - to avoid duplicate fetching)
recipes_data: Optional recipes snapshot (NEW - to avoid duplicate fetching)
"""
try:
path = f"/tenants/{tenant_id}/procurement/auto-generate"
payload = {
"forecast_data": forecast_data,
"production_schedule_id": production_schedule_id,
"target_date": target_date,
"auto_create_pos": auto_create_pos,
"auto_approve_pos": auto_approve_pos
}
# NEW: Include cached data if provided
if inventory_data:
payload["inventory_data"] = inventory_data
if suppliers_data:
payload["suppliers_data"] = suppliers_data
if recipes_data:
payload["recipes_data"] = recipes_data
logger.info("Calling auto_generate_procurement (enhanced)",
tenant_id=tenant_id,
has_forecast_data=bool(forecast_data))
response = await self._post(path, json=payload)
return response
except Exception as e:
logger.error("Error calling auto_generate_procurement",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: REPLENISHMENT PLANNING ENDPOINTS
# ================================================================
async def generate_replenishment_plan(
self,
tenant_id: str,
requirements: List[Dict[str, Any]],
forecast_id: Optional[str] = None,
production_schedule_id: Optional[str] = None,
projection_horizon_days: int = 7,
service_level: float = 0.95,
buffer_days: int = 1
) -> Optional[Dict[str, Any]]:
"""
Generate advanced replenishment plan with full planning algorithms
Args:
tenant_id: Tenant ID
requirements: List of ingredient requirements
forecast_id: Optional forecast ID reference
production_schedule_id: Optional production schedule ID reference
projection_horizon_days: Days to project ahead (default 7)
service_level: Target service level for safety stock (default 0.95)
buffer_days: Buffer days for lead time (default 1)
Returns:
Dict with complete replenishment plan including:
- plan_id: Plan ID
- total_items: Total items in plan
- urgent_items: Number of urgent items
- high_risk_items: Number of high-risk items
- items: List of plan items with full metadata
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/generate"
payload = {
"tenant_id": tenant_id,
"requirements": requirements,
"forecast_id": forecast_id,
"production_schedule_id": production_schedule_id,
"projection_horizon_days": projection_horizon_days,
"service_level": service_level,
"buffer_days": buffer_days
}
logger.info("Generating replenishment plan",
tenant_id=tenant_id,
requirements_count=len(requirements))
response = await self._post(path, json=payload)
return response
except Exception as e:
logger.error("Error generating replenishment plan",
tenant_id=tenant_id, error=str(e))
return None
async def get_replenishment_plan(
self,
tenant_id: str,
plan_id: str
) -> Optional[Dict[str, Any]]:
"""
Get replenishment plan by ID
Args:
tenant_id: Tenant ID
plan_id: Plan ID
Returns:
Dict with complete plan details
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/{plan_id}"
logger.debug("Getting replenishment plan",
tenant_id=tenant_id, plan_id=plan_id)
response = await self._get(path)
return response
except Exception as e:
logger.error("Error getting replenishment plan",
tenant_id=tenant_id, plan_id=plan_id, error=str(e))
return None
async def list_replenishment_plans(
self,
tenant_id: str,
skip: int = 0,
limit: int = 100,
status: Optional[str] = None
) -> Optional[List[Dict[str, Any]]]:
"""
List replenishment plans for tenant
Args:
tenant_id: Tenant ID
skip: Number of records to skip (pagination)
limit: Maximum number of records to return
status: Optional status filter
Returns:
List of plan summaries
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans"
params = {"skip": skip, "limit": limit}
if status:
params["status"] = status
logger.debug("Listing replenishment plans",
tenant_id=tenant_id, skip=skip, limit=limit)
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error listing replenishment plans",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: INVENTORY PROJECTION ENDPOINTS
# ================================================================
async def project_inventory(
self,
tenant_id: str,
ingredient_id: str,
ingredient_name: str,
current_stock: float,
unit_of_measure: str,
daily_demand: List[Dict[str, Any]],
scheduled_receipts: List[Dict[str, Any]] = None,
projection_horizon_days: int = 7
) -> Optional[Dict[str, Any]]:
"""
Project inventory levels to identify future stockouts
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
ingredient_name: Ingredient name
current_stock: Current stock level
unit_of_measure: Unit of measure
daily_demand: List of daily demand forecasts
scheduled_receipts: List of scheduled receipts (POs, production)
projection_horizon_days: Days to project
Returns:
Dict with inventory projection including:
- daily_projections: Day-by-day projection
- stockout_days: Number of stockout days
- stockout_risk: Risk level (low/medium/high/critical)
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/inventory-projections/project"
payload = {
"ingredient_id": ingredient_id,
"ingredient_name": ingredient_name,
"current_stock": current_stock,
"unit_of_measure": unit_of_measure,
"daily_demand": daily_demand,
"scheduled_receipts": scheduled_receipts or [],
"projection_horizon_days": projection_horizon_days
}
logger.info("Projecting inventory",
tenant_id=tenant_id, ingredient_id=ingredient_id)
response = await self._post(path, json=payload)
return response
except Exception as e:
logger.error("Error projecting inventory",
tenant_id=tenant_id, error=str(e))
return None
async def get_inventory_projections(
self,
tenant_id: str,
ingredient_id: Optional[str] = None,
projection_date: Optional[str] = None,
stockout_only: bool = False,
skip: int = 0,
limit: int = 100
) -> Optional[List[Dict[str, Any]]]:
"""
Get inventory projections
Args:
tenant_id: Tenant ID
ingredient_id: Optional ingredient ID filter
projection_date: Optional date filter
stockout_only: Only return projections with stockouts
skip: Pagination skip
limit: Pagination limit
Returns:
List of inventory projections
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/inventory-projections"
params = {
"skip": skip,
"limit": limit,
"stockout_only": stockout_only
}
if ingredient_id:
params["ingredient_id"] = ingredient_id
if projection_date:
params["projection_date"] = projection_date
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting inventory projections",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: SAFETY STOCK CALCULATION
# ================================================================
async def calculate_safety_stock(
self,
tenant_id: str,
ingredient_id: str,
daily_demands: List[float],
lead_time_days: int,
service_level: float = 0.95
) -> Optional[Dict[str, Any]]:
"""
Calculate dynamic safety stock
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
daily_demands: Historical daily demands
lead_time_days: Supplier lead time
service_level: Target service level (0-1)
Returns:
Dict with safety stock calculation including:
- safety_stock_quantity: Calculated safety stock
- calculation_method: Method used
- confidence: Confidence level
- reasoning: Explanation
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/safety-stock/calculate"
payload = {
"ingredient_id": ingredient_id,
"daily_demands": daily_demands,
"lead_time_days": lead_time_days,
"service_level": service_level
}
response = await self._post(path, json=payload)
return response
except Exception as e:
logger.error("Error calculating safety stock",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: SUPPLIER SELECTION
# ================================================================
async def evaluate_supplier_selection(
self,
tenant_id: str,
ingredient_id: str,
ingredient_name: str,
required_quantity: float,
supplier_options: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Evaluate supplier options using multi-criteria analysis
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
ingredient_name: Ingredient name
required_quantity: Quantity needed
supplier_options: List of supplier options with pricing, lead time, etc.
Returns:
Dict with supplier selection result including:
- allocations: List of supplier allocations
- total_cost: Total cost
- selection_strategy: Strategy used (single/dual/multi)
- diversification_applied: Whether diversification was applied
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/supplier-selections/evaluate"
payload = {
"ingredient_id": ingredient_id,
"ingredient_name": ingredient_name,
"required_quantity": required_quantity,
"supplier_options": supplier_options
}
response = await self._post(path, json=payload)
return response
except Exception as e:
logger.error("Error evaluating supplier selection",
tenant_id=tenant_id, error=str(e))
return None
async def get_supplier_allocations(
self,
tenant_id: str,
requirement_id: Optional[str] = None,
supplier_id: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> Optional[List[Dict[str, Any]]]:
"""
Get supplier allocations
Args:
tenant_id: Tenant ID
requirement_id: Optional requirement ID filter
supplier_id: Optional supplier ID filter
skip: Pagination skip
limit: Pagination limit
Returns:
List of supplier allocations
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/supplier-allocations"
params = {"skip": skip, "limit": limit}
if requirement_id:
params["requirement_id"] = requirement_id
if supplier_id:
params["supplier_id"] = supplier_id
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting supplier allocations",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: ANALYTICS
# ================================================================
async def get_replenishment_analytics(
self,
tenant_id: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Get replenishment planning analytics
Args:
tenant_id: Tenant ID
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
Dict with analytics including:
- total_plans: Total plans created
- total_items_planned: Total items
- urgent_items_percentage: % of urgent items
- stockout_prevention_rate: Effectiveness metric
"""
try:
path = f"/tenants/{tenant_id}/replenishment-plans/analytics"
params = {}
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting replenishment analytics",
tenant_id=tenant_id, error=str(e))
return None