Files
bakery-ia/shared/clients/procurement_client.py

605 lines
22 KiB
Python
Raw Normal View History

2025-10-30 21:08:07 +01:00
"""
Procurement Service Client - ENHANCED VERSION
Adds support for advanced replenishment planning endpoints
NEW METHODS:
- generate_replenishment_plan()
- get_replenishment_plan()
- list_replenishment_plans()
- get_inventory_projections()
- calculate_safety_stock()
- evaluate_supplier_selection()
"""
import structlog
from typing import Dict, Any, Optional, List
from uuid import UUID
from datetime import date
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
class ProcurementServiceClient(BaseServiceClient):
"""Enhanced client for communicating with the Procurement Service"""
2025-11-05 13:34:56 +01:00
def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"):
super().__init__(calling_service_name, config)
2025-10-30 21:08:07 +01:00
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# ORIGINAL PROCUREMENT PLANNING (Kept for backward compatibility)
# ================================================================
async def auto_generate_procurement(
self,
tenant_id: str,
forecast_data: Dict[str, Any],
production_schedule_id: Optional[str] = None,
target_date: Optional[str] = None,
auto_create_pos: bool = False,
auto_approve_pos: bool = False,
inventory_data: Optional[Dict[str, Any]] = None,
suppliers_data: Optional[Dict[str, Any]] = None,
recipes_data: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
"""
Auto-generate procurement plan from forecast data (called by orchestrator)
NOW USES ENHANCED PLANNING INTERNALLY
Args:
tenant_id: Tenant ID
forecast_data: Forecast data
production_schedule_id: Optional production schedule ID
target_date: Optional target date
auto_create_pos: Auto-create purchase orders
auto_approve_pos: Auto-approve purchase orders
inventory_data: Optional inventory snapshot (NEW - to avoid duplicate fetching)
suppliers_data: Optional suppliers snapshot (NEW - to avoid duplicate fetching)
recipes_data: Optional recipes snapshot (NEW - to avoid duplicate fetching)
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/auto-generate"
2025-10-30 21:08:07 +01:00
payload = {
"forecast_data": forecast_data,
"production_schedule_id": production_schedule_id,
"target_date": target_date,
"auto_create_pos": auto_create_pos,
"auto_approve_pos": auto_approve_pos
}
# NEW: Include cached data if provided
if inventory_data:
payload["inventory_data"] = inventory_data
if suppliers_data:
payload["suppliers_data"] = suppliers_data
if recipes_data:
payload["recipes_data"] = recipes_data
logger.info("Calling auto_generate_procurement (enhanced)",
tenant_id=tenant_id,
has_forecast_data=bool(forecast_data))
2025-11-05 13:34:56 +01:00
# Remove tenant_id from path since it's passed as separate parameter
endpoint = f"procurement/operations/auto-generate"
response = await self.post(endpoint, data=payload, tenant_id=tenant_id)
2025-10-30 21:08:07 +01:00
return response
except Exception as e:
logger.error("Error calling auto_generate_procurement",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: REPLENISHMENT PLANNING ENDPOINTS
# ================================================================
async def generate_replenishment_plan(
self,
tenant_id: str,
requirements: List[Dict[str, Any]],
forecast_id: Optional[str] = None,
production_schedule_id: Optional[str] = None,
projection_horizon_days: int = 7,
service_level: float = 0.95,
buffer_days: int = 1
) -> Optional[Dict[str, Any]]:
"""
Generate advanced replenishment plan with full planning algorithms
Args:
tenant_id: Tenant ID
requirements: List of ingredient requirements
forecast_id: Optional forecast ID reference
production_schedule_id: Optional production schedule ID reference
projection_horizon_days: Days to project ahead (default 7)
service_level: Target service level for safety stock (default 0.95)
buffer_days: Buffer days for lead time (default 1)
Returns:
Dict with complete replenishment plan including:
- plan_id: Plan ID
- total_items: Total items in plan
- urgent_items: Number of urgent items
- high_risk_items: Number of high-risk items
- items: List of plan items with full metadata
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/generate"
2025-10-30 21:08:07 +01:00
payload = {
"tenant_id": tenant_id,
"requirements": requirements,
"forecast_id": forecast_id,
"production_schedule_id": production_schedule_id,
"projection_horizon_days": projection_horizon_days,
"service_level": service_level,
"buffer_days": buffer_days
}
logger.info("Generating replenishment plan",
tenant_id=tenant_id,
requirements_count=len(requirements))
2025-11-05 13:34:56 +01:00
# Remove tenant_id from path since it's passed as separate parameter
endpoint = f"procurement/operations/replenishment-plans/generate"
response = await self.post(endpoint, data=payload, tenant_id=tenant_id)
2025-10-30 21:08:07 +01:00
return response
except Exception as e:
logger.error("Error generating replenishment plan",
tenant_id=tenant_id, error=str(e))
return None
async def get_replenishment_plan(
self,
tenant_id: str,
plan_id: str
) -> Optional[Dict[str, Any]]:
"""
Get replenishment plan by ID
Args:
tenant_id: Tenant ID
plan_id: Plan ID
Returns:
Dict with complete plan details
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/replenishment-plans/{plan_id}"
2025-10-30 21:08:07 +01:00
logger.debug("Getting replenishment plan",
tenant_id=tenant_id, plan_id=plan_id)
response = await self._get(path)
return response
except Exception as e:
logger.error("Error getting replenishment plan",
tenant_id=tenant_id, plan_id=plan_id, error=str(e))
return None
async def list_replenishment_plans(
self,
tenant_id: str,
skip: int = 0,
limit: int = 100,
status: Optional[str] = None
) -> Optional[List[Dict[str, Any]]]:
"""
List replenishment plans for tenant
Args:
tenant_id: Tenant ID
skip: Number of records to skip (pagination)
limit: Maximum number of records to return
status: Optional status filter
Returns:
List of plan summaries
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans"
2025-10-30 21:08:07 +01:00
params = {"skip": skip, "limit": limit}
if status:
params["status"] = status
logger.debug("Listing replenishment plans",
tenant_id=tenant_id, skip=skip, limit=limit)
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error listing replenishment plans",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: INVENTORY PROJECTION ENDPOINTS
# ================================================================
async def project_inventory(
self,
tenant_id: str,
ingredient_id: str,
ingredient_name: str,
current_stock: float,
unit_of_measure: str,
daily_demand: List[Dict[str, Any]],
scheduled_receipts: List[Dict[str, Any]] = None,
projection_horizon_days: int = 7
) -> Optional[Dict[str, Any]]:
"""
Project inventory levels to identify future stockouts
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
ingredient_name: Ingredient name
current_stock: Current stock level
unit_of_measure: Unit of measure
daily_demand: List of daily demand forecasts
scheduled_receipts: List of scheduled receipts (POs, production)
projection_horizon_days: Days to project
Returns:
Dict with inventory projection including:
- daily_projections: Day-by-day projection
- stockout_days: Number of stockout days
- stockout_risk: Risk level (low/medium/high/critical)
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/inventory-projections/project"
2025-10-30 21:08:07 +01:00
payload = {
"ingredient_id": ingredient_id,
"ingredient_name": ingredient_name,
"current_stock": current_stock,
"unit_of_measure": unit_of_measure,
"daily_demand": daily_demand,
"scheduled_receipts": scheduled_receipts or [],
"projection_horizon_days": projection_horizon_days
}
logger.info("Projecting inventory",
tenant_id=tenant_id, ingredient_id=ingredient_id)
2025-11-05 13:34:56 +01:00
# Remove tenant_id from path since it's passed as separate parameter
endpoint = f"procurement/operations/replenishment-plans/inventory-projections/project"
response = await self.post(endpoint, data=payload, tenant_id=tenant_id)
2025-10-30 21:08:07 +01:00
return response
except Exception as e:
logger.error("Error projecting inventory",
tenant_id=tenant_id, error=str(e))
return None
async def get_inventory_projections(
self,
tenant_id: str,
ingredient_id: Optional[str] = None,
projection_date: Optional[str] = None,
stockout_only: bool = False,
skip: int = 0,
limit: int = 100
) -> Optional[List[Dict[str, Any]]]:
"""
Get inventory projections
Args:
tenant_id: Tenant ID
ingredient_id: Optional ingredient ID filter
projection_date: Optional date filter
stockout_only: Only return projections with stockouts
skip: Pagination skip
limit: Pagination limit
Returns:
List of inventory projections
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/inventory-projections"
2025-10-30 21:08:07 +01:00
params = {
"skip": skip,
"limit": limit,
"stockout_only": stockout_only
}
if ingredient_id:
params["ingredient_id"] = ingredient_id
if projection_date:
params["projection_date"] = projection_date
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting inventory projections",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: SAFETY STOCK CALCULATION
# ================================================================
async def calculate_safety_stock(
self,
tenant_id: str,
ingredient_id: str,
daily_demands: List[float],
lead_time_days: int,
service_level: float = 0.95
) -> Optional[Dict[str, Any]]:
"""
Calculate dynamic safety stock
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
daily_demands: Historical daily demands
lead_time_days: Supplier lead time
service_level: Target service level (0-1)
Returns:
Dict with safety stock calculation including:
- safety_stock_quantity: Calculated safety stock
- calculation_method: Method used
- confidence: Confidence level
- reasoning: Explanation
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/safety-stock/calculate"
2025-10-30 21:08:07 +01:00
payload = {
"ingredient_id": ingredient_id,
"daily_demands": daily_demands,
"lead_time_days": lead_time_days,
"service_level": service_level
}
2025-11-05 13:34:56 +01:00
# Remove tenant_id from path since it's passed as separate parameter
endpoint = f"procurement/operations/replenishment-plans/safety-stock/calculate"
response = await self.post(endpoint, data=payload, tenant_id=tenant_id)
2025-10-30 21:08:07 +01:00
return response
except Exception as e:
logger.error("Error calculating safety stock",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: SUPPLIER SELECTION
# ================================================================
async def evaluate_supplier_selection(
self,
tenant_id: str,
ingredient_id: str,
ingredient_name: str,
required_quantity: float,
supplier_options: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Evaluate supplier options using multi-criteria analysis
Args:
tenant_id: Tenant ID
ingredient_id: Ingredient ID
ingredient_name: Ingredient name
required_quantity: Quantity needed
supplier_options: List of supplier options with pricing, lead time, etc.
Returns:
Dict with supplier selection result including:
- allocations: List of supplier allocations
- total_cost: Total cost
- selection_strategy: Strategy used (single/dual/multi)
- diversification_applied: Whether diversification was applied
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/supplier-selections/evaluate"
2025-10-30 21:08:07 +01:00
payload = {
"ingredient_id": ingredient_id,
"ingredient_name": ingredient_name,
"required_quantity": required_quantity,
"supplier_options": supplier_options
}
2025-11-05 13:34:56 +01:00
# Remove tenant_id from path since it's passed as separate parameter
endpoint = f"procurement/operations/replenishment-plans/supplier-selections/evaluate"
response = await self.post(endpoint, data=payload, tenant_id=tenant_id)
2025-10-30 21:08:07 +01:00
return response
except Exception as e:
logger.error("Error evaluating supplier selection",
tenant_id=tenant_id, error=str(e))
return None
async def get_supplier_allocations(
self,
tenant_id: str,
requirement_id: Optional[str] = None,
supplier_id: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> Optional[List[Dict[str, Any]]]:
"""
Get supplier allocations
Args:
tenant_id: Tenant ID
requirement_id: Optional requirement ID filter
supplier_id: Optional supplier ID filter
skip: Pagination skip
limit: Pagination limit
Returns:
List of supplier allocations
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/operations/replenishment-plans/supplier-allocations"
2025-10-30 21:08:07 +01:00
params = {"skip": skip, "limit": limit}
if requirement_id:
params["requirement_id"] = requirement_id
if supplier_id:
params["supplier_id"] = supplier_id
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting supplier allocations",
tenant_id=tenant_id, error=str(e))
return None
# ================================================================
# NEW: ANALYTICS
# ================================================================
async def get_replenishment_analytics(
self,
tenant_id: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Get replenishment planning analytics
Args:
tenant_id: Tenant ID
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
Dict with analytics including:
- total_plans: Total plans created
- total_items_planned: Total items
- urgent_items_percentage: % of urgent items
- stockout_prevention_rate: Effectiveness metric
"""
try:
2025-11-05 13:34:56 +01:00
path = f"/tenants/{tenant_id}/procurement/analytics/replenishment-plans"
2025-10-30 21:08:07 +01:00
params = {}
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
response = await self._get(path, params=params)
return response
except Exception as e:
logger.error("Error getting replenishment analytics",
tenant_id=tenant_id, error=str(e))
return None
2025-11-05 13:34:56 +01:00
# ================================================================
# ML INSIGHTS: Supplier Analysis and Price Forecasting
# ================================================================
async def trigger_supplier_analysis(
self,
tenant_id: str,
supplier_ids: Optional[List[str]] = None,
lookback_days: int = 180,
min_orders: int = 10
) -> Optional[Dict[str, Any]]:
"""
Trigger supplier performance analysis.
Args:
tenant_id: Tenant UUID
supplier_ids: Specific supplier IDs to analyze. If None, analyzes all suppliers
lookback_days: Days of historical orders to analyze (30-730)
min_orders: Minimum orders required for analysis (5-100)
Returns:
Dict with analysis results including insights posted
"""
try:
data = {
"supplier_ids": supplier_ids,
"lookback_days": lookback_days,
"min_orders": min_orders
}
result = await self.post("procurement/ml/insights/analyze-suppliers", data=data, tenant_id=tenant_id)
if result:
logger.info("Triggered supplier analysis",
suppliers_analyzed=result.get('suppliers_analyzed', 0),
insights_posted=result.get('total_insights_posted', 0),
tenant_id=tenant_id)
return result
except Exception as e:
logger.error("Error triggering supplier analysis",
error=str(e), tenant_id=tenant_id)
return None
async def trigger_price_forecasting(
self,
tenant_id: str,
ingredient_ids: Optional[List[str]] = None,
lookback_days: int = 180,
forecast_horizon_days: int = 30
) -> Optional[Dict[str, Any]]:
"""
Trigger price forecasting for procurement ingredients.
Args:
tenant_id: Tenant UUID
ingredient_ids: Specific ingredient IDs to forecast. If None, forecasts all ingredients
lookback_days: Days of historical price data to analyze (90-730)
forecast_horizon_days: Days to forecast ahead (7-90)
Returns:
Dict with forecasting results including insights posted
"""
try:
data = {
"ingredient_ids": ingredient_ids,
"lookback_days": lookback_days,
"forecast_horizon_days": forecast_horizon_days
}
result = await self.post("procurement/ml/insights/forecast-prices", data=data, tenant_id=tenant_id)
if result:
logger.info("Triggered price forecasting",
ingredients_forecasted=result.get('ingredients_forecasted', 0),
insights_posted=result.get('total_insights_posted', 0),
buy_now_recommendations=result.get('buy_now_recommendations', 0),
tenant_id=tenant_id)
return result
except Exception as e:
logger.error("Error triggering price forecasting",
error=str(e), tenant_id=tenant_id)
return None
# ================================================================
# DASHBOARD METHODS
# ================================================================
async def get_pending_purchase_orders(
self,
tenant_id: str,
limit: int = 20
) -> Optional[Dict[str, Any]]:
"""
Get purchase orders pending approval for dashboard
Args:
tenant_id: Tenant ID
limit: Maximum number of POs to return
Returns:
Dict with {"items": [...], "total": n}
"""
try:
return await self.get(
"/procurement/purchase-orders",
tenant_id=tenant_id,
params={"status": "pending_approval", "limit": limit}
)
except Exception as e:
logger.error("Error fetching pending purchase orders", error=str(e), tenant_id=tenant_id)
return None