Files
bakery-ia/services/orders/app/services/procurement_service.py
2025-08-23 19:47:08 +02:00

580 lines
25 KiB
Python

# ================================================================
# services/orders/app/services/procurement_service.py
# ================================================================
"""
Procurement Service - Business logic for procurement planning and scheduling
"""
import asyncio
import uuid
from datetime import datetime, date, timedelta
from decimal import Decimal
from typing import List, Optional, Dict, Any, Tuple
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.procurement import ProcurementPlan, ProcurementRequirement
from app.repositories.procurement_repository import ProcurementPlanRepository, ProcurementRequirementRepository
from app.schemas.procurement_schemas import (
ProcurementPlanCreate, ProcurementPlanResponse, ProcurementRequirementCreate,
GeneratePlanRequest, GeneratePlanResponse, DashboardData, ProcurementSummary
)
from app.core.config import settings
from shared.clients.inventory_client import InventoryServiceClient
from shared.clients.forecast_client import ForecastServiceClient
from shared.config.base import BaseServiceSettings
from shared.messaging.rabbitmq import RabbitMQClient
from shared.monitoring.decorators import monitor_performance
from app.services.cache_service import get_cache_service, CacheService
logger = structlog.get_logger()
class ProcurementService:
"""Service for managing procurement plans and scheduling"""
def __init__(
self,
db: AsyncSession,
config: BaseServiceSettings,
inventory_client: Optional[InventoryServiceClient] = None,
forecast_client: Optional[ForecastServiceClient] = None,
cache_service: Optional[CacheService] = None
):
self.db = db
self.config = config
self.plan_repo = ProcurementPlanRepository(db)
self.requirement_repo = ProcurementRequirementRepository(db)
# Initialize service clients
self.inventory_client = inventory_client or InventoryServiceClient(config)
self.forecast_client = forecast_client or ForecastServiceClient(config, "orders-service")
self.cache_service = cache_service or get_cache_service()
# Initialize RabbitMQ client
rabbitmq_url = getattr(config, 'RABBITMQ_URL', 'amqp://guest:guest@localhost:5672/')
self.rabbitmq_client = RabbitMQClient(rabbitmq_url, "orders-service")
# ================================================================
# PROCUREMENT PLAN OPERATIONS
# ================================================================
async def get_current_plan(self, tenant_id: uuid.UUID) -> Optional[ProcurementPlanResponse]:
"""Get the current day's procurement plan"""
try:
# Try cache first
cached_plan = await self.cache_service.get_cached_plan(tenant_id)
if cached_plan:
return ProcurementPlanResponse.model_validate(cached_plan)
# Get from database
plan = await self.plan_repo.get_current_plan(tenant_id)
if plan:
# Cache the result
await self.cache_service.cache_procurement_plan(plan)
return ProcurementPlanResponse.model_validate(plan)
return None
except Exception as e:
logger.error("Error getting current plan", error=str(e), tenant_id=tenant_id)
return None
async def get_plan_by_date(self, tenant_id: uuid.UUID, plan_date: date) -> Optional[ProcurementPlanResponse]:
"""Get procurement plan for a specific date"""
try:
plan = await self.plan_repo.get_plan_by_date(plan_date, tenant_id)
return ProcurementPlanResponse.model_validate(plan) if plan else None
except Exception as e:
logger.error("Error getting plan by date", error=str(e), tenant_id=tenant_id, date=plan_date)
return None
async def get_plan_by_id(self, tenant_id: uuid.UUID, plan_id: uuid.UUID) -> Optional[ProcurementPlanResponse]:
"""Get procurement plan by ID"""
try:
plan = await self.plan_repo.get_plan_by_id(plan_id, tenant_id)
return ProcurementPlanResponse.model_validate(plan) if plan else None
except Exception as e:
logger.error("Error getting plan by ID", error=str(e), tenant_id=tenant_id, plan_id=plan_id)
return None
@monitor_performance("generate_procurement_plan")
async def generate_procurement_plan(
self,
tenant_id: uuid.UUID,
request: GeneratePlanRequest
) -> GeneratePlanResponse:
"""Generate a new procurement plan based on forecasts and inventory"""
try:
plan_date = request.plan_date or date.today()
# Check if plan already exists
existing_plan = await self.plan_repo.get_plan_by_date(plan_date, tenant_id)
if existing_plan and not request.force_regenerate:
return GeneratePlanResponse(
success=True,
message="Plan already exists for this date",
plan=ProcurementPlanResponse.model_validate(existing_plan),
warnings=["Plan already exists. Use force_regenerate=true to recreate."]
)
logger.info("Starting procurement plan generation", tenant_id=tenant_id, plan_date=plan_date)
# Step 1: Get current inventory
inventory_items = await self._get_inventory_list(tenant_id)
if not inventory_items:
return GeneratePlanResponse(
success=False,
message="No inventory items found",
errors=["Unable to retrieve inventory data"]
)
# Step 2: Generate forecasts for each inventory item
forecasts = await self._generate_demand_forecasts(
tenant_id,
inventory_items,
plan_date,
request.planning_horizon_days
)
# Step 3: Create procurement plan
plan_data = await self._create_plan_data(
tenant_id,
plan_date,
request,
inventory_items,
forecasts
)
# Delete existing plan if force regenerate
if existing_plan and request.force_regenerate:
await self.plan_repo.delete_plan(existing_plan.id, tenant_id)
await self.db.flush()
# Step 4: Save plan to database
plan = await self.plan_repo.create_plan(plan_data)
# Step 5: Create requirements
requirements_data = await self._create_requirements_data(
plan.id,
inventory_items,
forecasts,
request
)
if requirements_data:
await self.requirement_repo.create_requirements_batch(requirements_data)
await self.db.commit()
# Step 6: Cache the plan and publish event
await self._cache_procurement_plan(plan)
await self._publish_plan_generated_event(tenant_id, plan.id)
logger.info("Procurement plan generated successfully",
tenant_id=tenant_id, plan_id=plan.id, requirements_count=len(requirements_data))
# Refresh plan with requirements
saved_plan = await self.plan_repo.get_plan_by_id(plan.id, tenant_id)
return GeneratePlanResponse(
success=True,
message="Procurement plan generated successfully",
plan=ProcurementPlanResponse.model_validate(saved_plan)
)
except Exception as e:
await self.db.rollback()
logger.error("Error generating procurement plan", error=str(e), tenant_id=tenant_id)
return GeneratePlanResponse(
success=False,
message="Failed to generate procurement plan",
errors=[str(e)]
)
async def update_plan_status(
self,
tenant_id: uuid.UUID,
plan_id: uuid.UUID,
status: str,
updated_by: Optional[uuid.UUID] = None
) -> Optional[ProcurementPlanResponse]:
"""Update procurement plan status"""
try:
updates = {"status": status, "updated_by": updated_by}
if status == "approved":
updates["approved_at"] = datetime.utcnow()
updates["approved_by"] = updated_by
elif status == "in_execution":
updates["execution_started_at"] = datetime.utcnow()
elif status in ["completed", "cancelled"]:
updates["execution_completed_at"] = datetime.utcnow()
plan = await self.plan_repo.update_plan(plan_id, tenant_id, updates)
if plan:
await self.db.commit()
await self._cache_procurement_plan(plan)
return ProcurementPlanResponse.model_validate(plan)
return None
except Exception as e:
await self.db.rollback()
logger.error("Error updating plan status", error=str(e), plan_id=plan_id)
return None
# ================================================================
# DASHBOARD AND ANALYTICS
# ================================================================
async def get_dashboard_data(self, tenant_id: uuid.UUID) -> Optional[DashboardData]:
"""Get procurement dashboard data"""
try:
current_plan = await self.get_current_plan(tenant_id)
summary = await self._get_procurement_summary(tenant_id)
# Get additional dashboard data
upcoming_deliveries = await self._get_upcoming_deliveries(tenant_id)
overdue_requirements = await self._get_overdue_requirements(tenant_id)
low_stock_alerts = await self._get_low_stock_alerts(tenant_id)
performance_metrics = await self._get_performance_metrics(tenant_id)
return DashboardData(
current_plan=current_plan,
summary=summary,
upcoming_deliveries=upcoming_deliveries,
overdue_requirements=overdue_requirements,
low_stock_alerts=low_stock_alerts,
performance_metrics=performance_metrics
)
except Exception as e:
logger.error("Error getting dashboard data", error=str(e), tenant_id=tenant_id)
return None
# ================================================================
# DAILY SCHEDULER
# ================================================================
async def run_daily_scheduler(self) -> None:
"""Run the daily procurement planning scheduler"""
logger.info("Starting daily procurement scheduler")
try:
# This would typically be called by a cron job or scheduler service
# Get all active tenants (this would come from tenant service)
active_tenants = await self._get_active_tenants()
for tenant_id in active_tenants:
try:
await self._process_daily_plan_for_tenant(tenant_id)
except Exception as e:
logger.error("Error processing daily plan for tenant",
error=str(e), tenant_id=tenant_id)
continue
logger.info("Daily procurement scheduler completed", processed_tenants=len(active_tenants))
except Exception as e:
logger.error("Error in daily scheduler", error=str(e))
async def _process_daily_plan_for_tenant(self, tenant_id: uuid.UUID) -> None:
"""Process daily procurement plan for a specific tenant"""
try:
today = date.today()
# Check if plan already exists for today
existing_plan = await self.plan_repo.get_plan_by_date(today, tenant_id)
if existing_plan:
logger.info("Daily plan already exists", tenant_id=tenant_id, date=today)
return
# Generate plan for today
request = GeneratePlanRequest(
plan_date=today,
planning_horizon_days=settings.DEMAND_FORECAST_DAYS,
include_safety_stock=True,
safety_stock_percentage=Decimal(str(settings.SAFETY_STOCK_PERCENTAGE))
)
result = await self.generate_procurement_plan(tenant_id, request)
if result.success:
logger.info("Daily plan generated successfully", tenant_id=tenant_id)
else:
logger.error("Failed to generate daily plan",
tenant_id=tenant_id, errors=result.errors)
except Exception as e:
logger.error("Error processing daily plan", error=str(e), tenant_id=tenant_id)
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _get_inventory_list(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get current inventory list from inventory service"""
try:
return await self.inventory_client.get_all_ingredients(str(tenant_id))
except Exception as e:
logger.error("Error fetching inventory", error=str(e), tenant_id=tenant_id)
return []
async def _generate_demand_forecasts(
self,
tenant_id: uuid.UUID,
inventory_items: List[Dict[str, Any]],
target_date: date,
horizon_days: int
) -> Dict[str, Dict[str, Any]]:
"""Generate demand forecasts for inventory items"""
forecasts = {}
try:
# For each inventory item, request forecast
for item in inventory_items:
item_id = item.get('id')
if not item_id:
continue
try:
# Call forecast service for next day demand
forecast_data = await self.forecast_client.create_realtime_prediction(
tenant_id=str(tenant_id),
model_id="default", # Use default model or tenant-specific model
target_date=target_date.isoformat(),
features={
"product_id": item_id,
"current_stock": item.get('current_stock', 0),
"historical_usage": item.get('avg_daily_usage', 0),
"seasonality": self._calculate_seasonality_factor(target_date),
"day_of_week": target_date.weekday(),
"is_weekend": target_date.weekday() >= 5
}
)
if forecast_data:
forecasts[item_id] = forecast_data
except Exception as e:
logger.warning("Error forecasting for item",
item_id=item_id, error=str(e))
# Use fallback prediction
forecasts[item_id] = self._create_fallback_forecast(item)
return forecasts
except Exception as e:
logger.error("Error generating forecasts", error=str(e), tenant_id=tenant_id)
return {}
async def _create_plan_data(
self,
tenant_id: uuid.UUID,
plan_date: date,
request: GeneratePlanRequest,
inventory_items: List[Dict[str, Any]],
forecasts: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Create procurement plan data"""
plan_number = await self.plan_repo.generate_plan_number(tenant_id, plan_date)
total_items = len(inventory_items)
total_forecast_demand = sum(
f.get('predicted_value', 0) for f in forecasts.values()
)
return {
'tenant_id': tenant_id,
'plan_number': plan_number,
'plan_date': plan_date,
'plan_period_start': plan_date,
'plan_period_end': plan_date + timedelta(days=request.planning_horizon_days),
'planning_horizon_days': request.planning_horizon_days,
'status': 'draft',
'plan_type': 'regular',
'priority': 'normal',
'procurement_strategy': 'just_in_time',
'safety_stock_buffer': request.safety_stock_percentage,
'total_demand_quantity': Decimal(str(total_forecast_demand)),
'supply_risk_level': 'low',
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
async def _create_requirements_data(
self,
plan_id: uuid.UUID,
inventory_items: List[Dict[str, Any]],
forecasts: Dict[str, Dict[str, Any]],
request: GeneratePlanRequest
) -> List[Dict[str, Any]]:
"""Create procurement requirements data"""
requirements = []
for item in inventory_items:
item_id = item.get('id')
if not item_id or item_id not in forecasts:
continue
forecast = forecasts[item_id]
current_stock = Decimal(str(item.get('current_stock', 0)))
predicted_demand = Decimal(str(forecast.get('predicted_value', 0)))
safety_stock = predicted_demand * (request.safety_stock_percentage / 100)
total_needed = predicted_demand + safety_stock
net_requirement = max(Decimal('0'), total_needed - current_stock)
if net_requirement > 0: # Only create requirement if needed
requirement_number = await self.requirement_repo.generate_requirement_number(plan_id)
required_by_date = request.plan_date or date.today()
suggested_order_date = required_by_date - timedelta(days=settings.PROCUREMENT_LEAD_TIME_DAYS)
latest_order_date = required_by_date - timedelta(days=1)
requirements.append({
'plan_id': plan_id,
'requirement_number': requirement_number,
'product_id': uuid.UUID(item_id),
'product_name': item.get('name', ''),
'product_sku': item.get('sku', ''),
'product_category': item.get('category', ''),
'product_type': 'ingredient',
'required_quantity': predicted_demand,
'unit_of_measure': item.get('unit', 'kg'),
'safety_stock_quantity': safety_stock,
'total_quantity_needed': total_needed,
'current_stock_level': current_stock,
'available_stock': current_stock,
'net_requirement': net_requirement,
'forecast_demand': predicted_demand,
'buffer_demand': safety_stock,
'required_by_date': required_by_date,
'suggested_order_date': suggested_order_date,
'latest_order_date': latest_order_date,
'priority': self._calculate_priority(net_requirement, current_stock),
'risk_level': self._calculate_risk_level(item, forecast),
'status': 'pending',
'delivery_status': 'pending',
'ordered_quantity': Decimal('0'),
'received_quantity': Decimal('0'),
'estimated_unit_cost': Decimal(str(item.get('avg_cost', 0))),
'estimated_total_cost': net_requirement * Decimal(str(item.get('avg_cost', 0)))
})
return requirements
def _calculate_priority(self, net_requirement: Decimal, current_stock: Decimal) -> str:
"""Calculate requirement priority based on stock levels"""
if current_stock <= 0:
return 'critical'
stock_ratio = net_requirement / current_stock if current_stock > 0 else float('inf')
if stock_ratio >= 2:
return 'critical'
elif stock_ratio >= 1:
return 'high'
elif stock_ratio >= 0.5:
return 'normal'
else:
return 'low'
def _calculate_risk_level(self, item: Dict[str, Any], forecast: Dict[str, Any]) -> str:
"""Calculate risk level for procurement requirement"""
confidence = forecast.get('confidence_score', 0.8)
lead_time = item.get('supplier_lead_time', 3)
if confidence < 0.6 or lead_time > 7:
return 'high'
elif confidence < 0.8 or lead_time > 3:
return 'medium'
else:
return 'low'
def _calculate_seasonality_factor(self, target_date: date) -> float:
"""Calculate seasonality adjustment factor"""
# Simple seasonality based on month
seasonal_factors = {
12: 1.3, 1: 1.2, 2: 0.9, # Winter
3: 1.1, 4: 1.2, 5: 1.3, # Spring
6: 1.4, 7: 1.5, 8: 1.4, # Summer
9: 1.2, 10: 1.1, 11: 1.2 # Fall
}
return seasonal_factors.get(target_date.month, 1.0)
def _create_fallback_forecast(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Create fallback forecast when service is unavailable"""
avg_usage = item.get('avg_daily_usage', 0)
return {
'predicted_value': avg_usage * 1.1, # 10% buffer
'confidence_score': 0.5,
'lower_bound': avg_usage * 0.8,
'upper_bound': avg_usage * 1.3,
'fallback': True
}
async def _cache_procurement_plan(self, plan: ProcurementPlan) -> None:
"""Cache procurement plan in Redis"""
try:
await self.cache_service.cache_procurement_plan(plan)
except Exception as e:
logger.warning("Failed to cache plan", error=str(e), plan_id=plan.id)
async def _publish_plan_generated_event(self, tenant_id: uuid.UUID, plan_id: uuid.UUID) -> None:
"""Publish plan generated event"""
try:
event_data = {
"tenant_id": str(tenant_id),
"plan_id": str(plan_id),
"timestamp": datetime.utcnow().isoformat(),
"event_type": "procurement.plan.generated"
}
await self.rabbitmq_client.publish_event(
exchange_name="procurement.events",
routing_key="procurement.plan.generated",
event_data=event_data
)
except Exception as e:
logger.warning("Failed to publish event", error=str(e))
async def _get_active_tenants(self) -> List[uuid.UUID]:
"""Get list of active tenant IDs"""
# This would typically call the tenant service
# For now, return empty list - would be implemented with actual tenant service
return []
async def _get_procurement_summary(self, tenant_id: uuid.UUID) -> ProcurementSummary:
"""Get procurement summary for dashboard"""
# Implement summary calculation
return ProcurementSummary(
total_plans=0,
active_plans=0,
total_requirements=0,
pending_requirements=0,
critical_requirements=0,
total_estimated_cost=Decimal('0'),
total_approved_cost=Decimal('0'),
cost_variance=Decimal('0')
)
async def _get_upcoming_deliveries(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get upcoming deliveries"""
return []
async def _get_overdue_requirements(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get overdue requirements"""
return []
async def _get_low_stock_alerts(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get low stock alerts from inventory service"""
try:
return await self.inventory_client.get_low_stock_alerts(str(tenant_id))
except Exception as e:
logger.error("Error getting low stock alerts", error=str(e))
return []
async def _get_performance_metrics(self, tenant_id: uuid.UUID) -> Dict[str, Any]:
"""Get performance metrics"""
return {}