Add procurement management logic

This commit is contained in:
Urtzi Alfaro
2025-08-23 19:47:08 +02:00
parent 62ff755f25
commit 5077a45a25
22 changed files with 4011 additions and 79 deletions

View File

@@ -0,0 +1,432 @@
# ================================================================
# services/orders/app/api/procurement.py
# ================================================================
"""
Procurement API Endpoints - RESTful APIs for procurement planning
"""
import uuid
from datetime import date
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.config import settings
from app.services.procurement_service import ProcurementService
from app.schemas.procurement_schemas import (
ProcurementPlanResponse, GeneratePlanRequest, GeneratePlanResponse,
DashboardData, PaginatedProcurementPlans
)
from shared.auth.decorators import require_authentication, get_current_user_dep
from fastapi import Depends, Request
from typing import Dict, Any
import uuid
from shared.clients.inventory_client import InventoryServiceClient
from shared.clients.forecast_client import ForecastServiceClient
from shared.config.base import BaseServiceSettings
from shared.monitoring.decorators import monitor_performance
# Create router
router = APIRouter(prefix="/procurement-plans", tags=["Procurement Planning"])
# Create service settings
service_settings = BaseServiceSettings()
# Simple TenantAccess class for compatibility
class TenantAccess:
def __init__(self, tenant_id: uuid.UUID, user_id: str):
self.tenant_id = tenant_id
self.user_id = user_id
async def get_current_tenant(
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user_dep)
) -> TenantAccess:
"""Get current tenant from user context"""
# For now, create a simple tenant access from user data
# In a real implementation, this would validate tenant access
tenant_id = current_user.get('tenant_id')
if not tenant_id:
# Try to get from headers as fallback
tenant_id = request.headers.get('x-tenant-id')
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant access required"
)
try:
tenant_uuid = uuid.UUID(tenant_id)
except (ValueError, TypeError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid tenant ID format"
)
return TenantAccess(
tenant_id=tenant_uuid,
user_id=current_user['user_id']
)
async def get_procurement_service(db: AsyncSession = Depends(get_db)) -> ProcurementService:
"""Get procurement service instance"""
inventory_client = InventoryServiceClient(service_settings)
forecast_client = ForecastServiceClient(service_settings, "orders-service")
return ProcurementService(db, service_settings, inventory_client, forecast_client)
# ================================================================
# PROCUREMENT PLAN ENDPOINTS
# ================================================================
@router.get("/current", response_model=Optional[ProcurementPlanResponse])
@monitor_performance("get_current_procurement_plan")
async def get_current_procurement_plan(
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get the procurement plan for the current day (forecasting for the next day)
Returns the plan details, including requirements per item.
"""
try:
plan = await procurement_service.get_current_plan(tenant_access.tenant_id)
return plan
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving current procurement plan: {str(e)}"
)
@router.get("/{plan_date}", response_model=Optional[ProcurementPlanResponse])
@monitor_performance("get_procurement_plan_by_date")
async def get_procurement_plan_by_date(
plan_date: date,
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get the procurement plan for a specific date (format: YYYY-MM-DD)
Returns the plan details, including requirements per item for the specified date.
"""
try:
plan = await procurement_service.get_plan_by_date(tenant_access.tenant_id, plan_date)
return plan
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving procurement plan for {plan_date}: {str(e)}"
)
@router.get("/", response_model=PaginatedProcurementPlans)
@monitor_performance("list_procurement_plans")
async def list_procurement_plans(
status: Optional[str] = Query(None, description="Filter by plan status"),
start_date: Optional[date] = Query(None, description="Start date filter (YYYY-MM-DD)"),
end_date: Optional[date] = Query(None, description="End date filter (YYYY-MM-DD)"),
limit: int = Query(50, ge=1, le=100, description="Number of plans to return"),
offset: int = Query(0, ge=0, description="Number of plans to skip"),
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
List procurement plans with optional filters
Supports filtering by status, date range, and pagination.
"""
try:
# Get plans from repository directly for listing
plans = await procurement_service.plan_repo.list_plans(
tenant_access.tenant_id,
status=status,
start_date=start_date,
end_date=end_date,
limit=limit,
offset=offset
)
# Convert to response models
plan_responses = [ProcurementPlanResponse.model_validate(plan) for plan in plans]
# For simplicity, we'll use the returned count as total
# In a production system, you'd want a separate count query
total = len(plan_responses)
has_more = len(plan_responses) == limit
return PaginatedProcurementPlans(
plans=plan_responses,
total=total,
page=offset // limit + 1 if limit > 0 else 1,
limit=limit,
has_more=has_more
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error listing procurement plans: {str(e)}"
)
@router.post("/generate", response_model=GeneratePlanResponse)
@monitor_performance("generate_procurement_plan")
async def generate_procurement_plan(
request: GeneratePlanRequest,
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Manually trigger the generation of a procurement plan
This can serve as a fallback if the daily scheduler hasn't run,
or for testing purposes. Can be forced to regenerate an existing plan.
"""
try:
if not settings.PROCUREMENT_PLANNING_ENABLED:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Procurement planning is currently disabled"
)
result = await procurement_service.generate_procurement_plan(
tenant_access.tenant_id,
request
)
if not result.success:
# Return the result with errors but don't raise an exception
# since the service method handles the error state properly
return result
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error generating procurement plan: {str(e)}"
)
@router.put("/{plan_id}/status")
@monitor_performance("update_procurement_plan_status")
async def update_procurement_plan_status(
plan_id: uuid.UUID,
status: str = Query(..., description="New status", pattern="^(draft|pending_approval|approved|in_execution|completed|cancelled)$"),
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Update the status of a procurement plan
Valid statuses: draft, pending_approval, approved, in_execution, completed, cancelled
"""
try:
updated_plan = await procurement_service.update_plan_status(
tenant_access.tenant_id,
plan_id,
status,
tenant_access.user_id
)
if not updated_plan:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Procurement plan not found"
)
return updated_plan
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating procurement plan status: {str(e)}"
)
@router.get("/id/{plan_id}", response_model=Optional[ProcurementPlanResponse])
@monitor_performance("get_procurement_plan_by_id")
async def get_procurement_plan_by_id(
plan_id: uuid.UUID,
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get a specific procurement plan by its ID
Returns detailed plan information including all requirements.
"""
try:
plan = await procurement_service.get_plan_by_id(tenant_access.tenant_id, plan_id)
if not plan:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Procurement plan not found"
)
return plan
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving procurement plan: {str(e)}"
)
# ================================================================
# DASHBOARD ENDPOINTS
# ================================================================
@router.get("/dashboard/data", response_model=Optional[DashboardData])
@monitor_performance("get_procurement_dashboard")
async def get_procurement_dashboard(
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get procurement dashboard data
Returns comprehensive dashboard information including:
- Current plan
- Summary statistics
- Upcoming deliveries
- Overdue requirements
- Low stock alerts
- Performance metrics
"""
try:
dashboard_data = await procurement_service.get_dashboard_data(tenant_access.tenant_id)
return dashboard_data
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving dashboard data: {str(e)}"
)
# ================================================================
# REQUIREMENT MANAGEMENT ENDPOINTS
# ================================================================
@router.get("/{plan_id}/requirements")
@monitor_performance("get_plan_requirements")
async def get_plan_requirements(
plan_id: uuid.UUID,
status: Optional[str] = Query(None, description="Filter by requirement status"),
priority: Optional[str] = Query(None, description="Filter by priority level"),
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get all requirements for a specific procurement plan
Supports filtering by status and priority level.
"""
try:
# Verify plan exists and belongs to tenant
plan = await procurement_service.get_plan_by_id(tenant_access.tenant_id, plan_id)
if not plan:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Procurement plan not found"
)
# Get requirements from repository
requirements = await procurement_service.requirement_repo.get_requirements_by_plan(plan_id)
# Apply filters if provided
if status:
requirements = [r for r in requirements if r.status == status]
if priority:
requirements = [r for r in requirements if r.priority == priority]
return requirements
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving plan requirements: {str(e)}"
)
@router.get("/requirements/critical")
@monitor_performance("get_critical_requirements")
async def get_critical_requirements(
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Get all critical priority requirements across all active plans
Returns requirements that need immediate attention.
"""
try:
requirements = await procurement_service.requirement_repo.get_critical_requirements(
tenant_access.tenant_id
)
return requirements
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving critical requirements: {str(e)}"
)
# ================================================================
# UTILITY ENDPOINTS
# ================================================================
@router.post("/scheduler/trigger")
@monitor_performance("trigger_daily_scheduler")
async def trigger_daily_scheduler(
tenant_access: TenantAccess = Depends(get_current_tenant),
procurement_service: ProcurementService = Depends(get_procurement_service)
):
"""
Manually trigger the daily scheduler for the current tenant
This endpoint is primarily for testing and maintenance purposes.
"""
try:
# Process daily plan for current tenant only
await procurement_service._process_daily_plan_for_tenant(tenant_access.tenant_id)
return {
"success": True,
"message": "Daily scheduler executed successfully",
"tenant_id": str(tenant_access.tenant_id)
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error triggering daily scheduler: {str(e)}"
)
@router.get("/health")
async def procurement_health_check():
"""
Health check endpoint for procurement service
"""
return {
"status": "healthy",
"service": "procurement-planning",
"procurement_enabled": settings.PROCUREMENT_PLANNING_ENABLED,
"timestamp": date.today().isoformat()
}

View File

@@ -14,6 +14,7 @@ import structlog
from app.core.config import settings
from app.core.database import init_database, get_db_health
from app.api.orders import router as orders_router
from app.api.procurement import router as procurement_router
# Configure logging
logger = structlog.get_logger()
@@ -55,6 +56,7 @@ app.add_middleware(
# Include routers
app.include_router(orders_router, prefix="/api/v1")
app.include_router(procurement_router, prefix="/api/v1")
@app.get("/health")

View File

@@ -0,0 +1,248 @@
# ================================================================
# services/orders/app/repositories/procurement_repository.py
# ================================================================
"""
Procurement Repository - Database operations for procurement plans and requirements
"""
import uuid
from datetime import datetime, date
from decimal import Decimal
from typing import List, Optional, Dict, Any
from sqlalchemy import select, and_, or_, desc, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.procurement import ProcurementPlan, ProcurementRequirement
from app.repositories.base_repository import BaseRepository
class ProcurementPlanRepository(BaseRepository):
"""Repository for procurement plan operations"""
def __init__(self, db: AsyncSession):
super().__init__(db, ProcurementPlan)
async def create_plan(self, plan_data: Dict[str, Any]) -> ProcurementPlan:
"""Create a new procurement plan"""
plan = ProcurementPlan(**plan_data)
self.db.add(plan)
await self.db.flush()
return plan
async def get_plan_by_id(self, plan_id: uuid.UUID, tenant_id: uuid.UUID) -> Optional[ProcurementPlan]:
"""Get procurement plan by ID"""
stmt = select(ProcurementPlan).where(
and_(
ProcurementPlan.id == plan_id,
ProcurementPlan.tenant_id == tenant_id
)
).options(selectinload(ProcurementPlan.requirements))
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_plan_by_date(self, plan_date: date, tenant_id: uuid.UUID) -> Optional[ProcurementPlan]:
"""Get procurement plan for a specific date"""
stmt = select(ProcurementPlan).where(
and_(
ProcurementPlan.plan_date == plan_date,
ProcurementPlan.tenant_id == tenant_id
)
).options(selectinload(ProcurementPlan.requirements))
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_current_plan(self, tenant_id: uuid.UUID) -> Optional[ProcurementPlan]:
"""Get the current day's procurement plan"""
today = date.today()
return await self.get_plan_by_date(today, tenant_id)
async def list_plans(
self,
tenant_id: uuid.UUID,
status: Optional[str] = None,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
limit: int = 50,
offset: int = 0
) -> List[ProcurementPlan]:
"""List procurement plans with filters"""
conditions = [ProcurementPlan.tenant_id == tenant_id]
if status:
conditions.append(ProcurementPlan.status == status)
if start_date:
conditions.append(ProcurementPlan.plan_date >= start_date)
if end_date:
conditions.append(ProcurementPlan.plan_date <= end_date)
stmt = (
select(ProcurementPlan)
.where(and_(*conditions))
.order_by(desc(ProcurementPlan.plan_date))
.limit(limit)
.offset(offset)
.options(selectinload(ProcurementPlan.requirements))
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def update_plan(self, plan_id: uuid.UUID, tenant_id: uuid.UUID, updates: Dict[str, Any]) -> Optional[ProcurementPlan]:
"""Update procurement plan"""
plan = await self.get_plan_by_id(plan_id, tenant_id)
if not plan:
return None
for key, value in updates.items():
if hasattr(plan, key):
setattr(plan, key, value)
plan.updated_at = datetime.utcnow()
await self.db.flush()
return plan
async def delete_plan(self, plan_id: uuid.UUID, tenant_id: uuid.UUID) -> bool:
"""Delete procurement plan"""
plan = await self.get_plan_by_id(plan_id, tenant_id)
if not plan:
return False
await self.db.delete(plan)
return True
async def generate_plan_number(self, tenant_id: uuid.UUID, plan_date: date) -> str:
"""Generate unique plan number"""
date_str = plan_date.strftime("%Y%m%d")
# Count existing plans for the same date
stmt = select(func.count(ProcurementPlan.id)).where(
and_(
ProcurementPlan.tenant_id == tenant_id,
ProcurementPlan.plan_date == plan_date
)
)
result = await self.db.execute(stmt)
count = result.scalar() or 0
return f"PP-{date_str}-{count + 1:03d}"
class ProcurementRequirementRepository(BaseRepository):
"""Repository for procurement requirement operations"""
def __init__(self, db: AsyncSession):
super().__init__(db, ProcurementRequirement)
async def create_requirement(self, requirement_data: Dict[str, Any]) -> ProcurementRequirement:
"""Create a new procurement requirement"""
requirement = ProcurementRequirement(**requirement_data)
self.db.add(requirement)
await self.db.flush()
return requirement
async def create_requirements_batch(self, requirements_data: List[Dict[str, Any]]) -> List[ProcurementRequirement]:
"""Create multiple procurement requirements"""
requirements = [ProcurementRequirement(**data) for data in requirements_data]
self.db.add_all(requirements)
await self.db.flush()
return requirements
async def get_requirement_by_id(self, requirement_id: uuid.UUID, tenant_id: uuid.UUID) -> Optional[ProcurementRequirement]:
"""Get procurement requirement by ID"""
stmt = select(ProcurementRequirement).join(ProcurementPlan).where(
and_(
ProcurementRequirement.id == requirement_id,
ProcurementPlan.tenant_id == tenant_id
)
)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_requirements_by_plan(self, plan_id: uuid.UUID) -> List[ProcurementRequirement]:
"""Get all requirements for a specific plan"""
stmt = select(ProcurementRequirement).where(
ProcurementRequirement.plan_id == plan_id
).order_by(ProcurementRequirement.priority.desc(), ProcurementRequirement.required_by_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_requirements_by_product(
self,
tenant_id: uuid.UUID,
product_id: uuid.UUID,
status: Optional[str] = None
) -> List[ProcurementRequirement]:
"""Get requirements for a specific product"""
conditions = [
ProcurementPlan.tenant_id == tenant_id,
ProcurementRequirement.product_id == product_id
]
if status:
conditions.append(ProcurementRequirement.status == status)
stmt = select(ProcurementRequirement).join(ProcurementPlan).where(
and_(*conditions)
).order_by(desc(ProcurementRequirement.required_by_date))
result = await self.db.execute(stmt)
return result.scalars().all()
async def update_requirement(
self,
requirement_id: uuid.UUID,
tenant_id: uuid.UUID,
updates: Dict[str, Any]
) -> Optional[ProcurementRequirement]:
"""Update procurement requirement"""
requirement = await self.get_requirement_by_id(requirement_id, tenant_id)
if not requirement:
return None
for key, value in updates.items():
if hasattr(requirement, key):
setattr(requirement, key, value)
await self.db.flush()
return requirement
async def get_pending_requirements(self, tenant_id: uuid.UUID) -> List[ProcurementRequirement]:
"""Get all pending requirements across plans"""
stmt = select(ProcurementRequirement).join(ProcurementPlan).where(
and_(
ProcurementPlan.tenant_id == tenant_id,
ProcurementRequirement.status == 'pending'
)
).order_by(ProcurementRequirement.priority.desc(), ProcurementRequirement.required_by_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_critical_requirements(self, tenant_id: uuid.UUID) -> List[ProcurementRequirement]:
"""Get critical priority requirements"""
stmt = select(ProcurementRequirement).join(ProcurementPlan).where(
and_(
ProcurementPlan.tenant_id == tenant_id,
ProcurementRequirement.priority == 'critical',
ProcurementRequirement.status.in_(['pending', 'approved'])
)
).order_by(ProcurementRequirement.required_by_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def generate_requirement_number(self, plan_id: uuid.UUID) -> str:
"""Generate unique requirement number within a plan"""
# Count existing requirements in the plan
stmt = select(func.count(ProcurementRequirement.id)).where(
ProcurementRequirement.plan_id == plan_id
)
result = await self.db.execute(stmt)
count = result.scalar() or 0
return f"REQ-{count + 1:05d}"

View File

@@ -0,0 +1,293 @@
# ================================================================
# services/orders/app/schemas/procurement_schemas.py
# ================================================================
"""
Procurement Schemas - Request/response models for procurement plans
"""
import uuid
from datetime import datetime, date
from decimal import Decimal
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, ConfigDict
# ================================================================
# BASE SCHEMAS
# ================================================================
class ProcurementBase(BaseModel):
"""Base schema for procurement entities"""
model_config = ConfigDict(from_attributes=True, str_strip_whitespace=True)
# ================================================================
# PROCUREMENT REQUIREMENT SCHEMAS
# ================================================================
class ProcurementRequirementBase(ProcurementBase):
"""Base procurement requirement schema"""
product_id: uuid.UUID
product_name: str = Field(..., min_length=1, max_length=200)
product_sku: Optional[str] = Field(None, max_length=100)
product_category: Optional[str] = Field(None, max_length=100)
product_type: str = Field(default="ingredient", max_length=50)
required_quantity: Decimal = Field(..., gt=0)
unit_of_measure: str = Field(..., min_length=1, max_length=50)
safety_stock_quantity: Decimal = Field(default=Decimal("0.000"), ge=0)
total_quantity_needed: Decimal = Field(..., gt=0)
current_stock_level: Decimal = Field(default=Decimal("0.000"), ge=0)
reserved_stock: Decimal = Field(default=Decimal("0.000"), ge=0)
available_stock: Decimal = Field(default=Decimal("0.000"), ge=0)
net_requirement: Decimal = Field(..., ge=0)
order_demand: Decimal = Field(default=Decimal("0.000"), ge=0)
production_demand: Decimal = Field(default=Decimal("0.000"), ge=0)
forecast_demand: Decimal = Field(default=Decimal("0.000"), ge=0)
buffer_demand: Decimal = Field(default=Decimal("0.000"), ge=0)
required_by_date: date
lead_time_buffer_days: int = Field(default=1, ge=0)
suggested_order_date: date
latest_order_date: date
priority: str = Field(default="normal", pattern="^(critical|high|normal|low)$")
risk_level: str = Field(default="low", pattern="^(low|medium|high|critical)$")
preferred_supplier_id: Optional[uuid.UUID] = None
backup_supplier_id: Optional[uuid.UUID] = None
supplier_name: Optional[str] = Field(None, max_length=200)
supplier_lead_time_days: Optional[int] = Field(None, ge=0)
minimum_order_quantity: Optional[Decimal] = Field(None, ge=0)
estimated_unit_cost: Optional[Decimal] = Field(None, ge=0)
estimated_total_cost: Optional[Decimal] = Field(None, ge=0)
last_purchase_cost: Optional[Decimal] = Field(None, ge=0)
class ProcurementRequirementCreate(ProcurementRequirementBase):
"""Schema for creating procurement requirements"""
special_requirements: Optional[str] = None
storage_requirements: Optional[str] = Field(None, max_length=200)
shelf_life_days: Optional[int] = Field(None, gt=0)
quality_specifications: Optional[Dict[str, Any]] = None
procurement_notes: Optional[str] = None
class ProcurementRequirementUpdate(ProcurementBase):
"""Schema for updating procurement requirements"""
status: Optional[str] = Field(None, pattern="^(pending|approved|ordered|partially_received|received|cancelled)$")
priority: Optional[str] = Field(None, pattern="^(critical|high|normal|low)$")
approved_quantity: Optional[Decimal] = Field(None, ge=0)
approved_cost: Optional[Decimal] = Field(None, ge=0)
purchase_order_id: Optional[uuid.UUID] = None
purchase_order_number: Optional[str] = Field(None, max_length=50)
ordered_quantity: Optional[Decimal] = Field(None, ge=0)
expected_delivery_date: Optional[date] = None
actual_delivery_date: Optional[date] = None
received_quantity: Optional[Decimal] = Field(None, ge=0)
delivery_status: Optional[str] = Field(None, pattern="^(pending|in_transit|delivered|delayed|cancelled)$")
procurement_notes: Optional[str] = None
class ProcurementRequirementResponse(ProcurementRequirementBase):
"""Schema for procurement requirement responses"""
id: uuid.UUID
plan_id: uuid.UUID
requirement_number: str
status: str
created_at: datetime
updated_at: datetime
purchase_order_id: Optional[uuid.UUID] = None
purchase_order_number: Optional[str] = None
ordered_quantity: Decimal
ordered_at: Optional[datetime] = None
expected_delivery_date: Optional[date] = None
actual_delivery_date: Optional[date] = None
received_quantity: Decimal
delivery_status: str
fulfillment_rate: Optional[Decimal] = None
on_time_delivery: Optional[bool] = None
quality_rating: Optional[Decimal] = None
approved_quantity: Optional[Decimal] = None
approved_cost: Optional[Decimal] = None
approved_at: Optional[datetime] = None
approved_by: Optional[uuid.UUID] = None
special_requirements: Optional[str] = None
storage_requirements: Optional[str] = None
shelf_life_days: Optional[int] = None
quality_specifications: Optional[Dict[str, Any]] = None
procurement_notes: Optional[str] = None
# ================================================================
# PROCUREMENT PLAN SCHEMAS
# ================================================================
class ProcurementPlanBase(ProcurementBase):
"""Base procurement plan schema"""
plan_date: date
plan_period_start: date
plan_period_end: date
planning_horizon_days: int = Field(default=14, gt=0)
plan_type: str = Field(default="regular", pattern="^(regular|emergency|seasonal)$")
priority: str = Field(default="normal", pattern="^(high|normal|low)$")
business_model: Optional[str] = Field(None, pattern="^(individual_bakery|central_bakery)$")
procurement_strategy: str = Field(default="just_in_time", pattern="^(just_in_time|bulk|mixed)$")
safety_stock_buffer: Decimal = Field(default=Decimal("20.00"), ge=0, le=100)
supply_risk_level: str = Field(default="low", pattern="^(low|medium|high|critical)$")
demand_forecast_confidence: Optional[Decimal] = Field(None, ge=1, le=10)
seasonality_adjustment: Decimal = Field(default=Decimal("0.00"))
special_requirements: Optional[str] = None
class ProcurementPlanCreate(ProcurementPlanBase):
"""Schema for creating procurement plans"""
tenant_id: uuid.UUID
requirements: Optional[List[ProcurementRequirementCreate]] = []
class ProcurementPlanUpdate(ProcurementBase):
"""Schema for updating procurement plans"""
status: Optional[str] = Field(None, pattern="^(draft|pending_approval|approved|in_execution|completed|cancelled)$")
priority: Optional[str] = Field(None, pattern="^(high|normal|low)$")
approved_at: Optional[datetime] = None
approved_by: Optional[uuid.UUID] = None
execution_started_at: Optional[datetime] = None
execution_completed_at: Optional[datetime] = None
special_requirements: Optional[str] = None
seasonal_adjustments: Optional[Dict[str, Any]] = None
class ProcurementPlanResponse(ProcurementPlanBase):
"""Schema for procurement plan responses"""
id: uuid.UUID
tenant_id: uuid.UUID
plan_number: str
status: str
total_requirements: int
total_estimated_cost: Decimal
total_approved_cost: Decimal
cost_variance: Decimal
total_demand_orders: int
total_demand_quantity: Decimal
total_production_requirements: Decimal
primary_suppliers_count: int
backup_suppliers_count: int
supplier_diversification_score: Optional[Decimal] = None
approved_at: Optional[datetime] = None
approved_by: Optional[uuid.UUID] = None
execution_started_at: Optional[datetime] = None
execution_completed_at: Optional[datetime] = None
fulfillment_rate: Optional[Decimal] = None
on_time_delivery_rate: Optional[Decimal] = None
cost_accuracy: Optional[Decimal] = None
quality_score: Optional[Decimal] = None
created_at: datetime
updated_at: datetime
created_by: Optional[uuid.UUID] = None
updated_by: Optional[uuid.UUID] = None
requirements: List[ProcurementRequirementResponse] = []
# ================================================================
# SUMMARY SCHEMAS
# ================================================================
class ProcurementSummary(ProcurementBase):
"""Summary of procurement plans"""
total_plans: int
active_plans: int
total_requirements: int
pending_requirements: int
critical_requirements: int
total_estimated_cost: Decimal
total_approved_cost: Decimal
cost_variance: Decimal
average_fulfillment_rate: Optional[Decimal] = None
average_on_time_delivery: Optional[Decimal] = None
top_suppliers: List[Dict[str, Any]] = []
critical_items: List[Dict[str, Any]] = []
class DashboardData(ProcurementBase):
"""Dashboard data for procurement overview"""
current_plan: Optional[ProcurementPlanResponse] = None
summary: ProcurementSummary
upcoming_deliveries: List[Dict[str, Any]] = []
overdue_requirements: List[Dict[str, Any]] = []
low_stock_alerts: List[Dict[str, Any]] = []
performance_metrics: Dict[str, Any] = {}
# ================================================================
# REQUEST SCHEMAS
# ================================================================
class GeneratePlanRequest(ProcurementBase):
"""Request to generate procurement plan"""
plan_date: Optional[date] = None
force_regenerate: bool = False
planning_horizon_days: int = Field(default=14, gt=0, le=30)
include_safety_stock: bool = True
safety_stock_percentage: Decimal = Field(default=Decimal("20.00"), ge=0, le=100)
class ForecastRequest(ProcurementBase):
"""Request parameters for demand forecasting"""
target_date: date
horizon_days: int = Field(default=1, gt=0, le=7)
include_confidence_intervals: bool = True
product_ids: Optional[List[uuid.UUID]] = None
# ================================================================
# RESPONSE SCHEMAS
# ================================================================
class GeneratePlanResponse(ProcurementBase):
"""Response from plan generation"""
success: bool
message: str
plan: Optional[ProcurementPlanResponse] = None
warnings: List[str] = []
errors: List[str] = []
class PaginatedProcurementPlans(ProcurementBase):
"""Paginated list of procurement plans"""
plans: List[ProcurementPlanResponse]
total: int
page: int
limit: int
has_more: bool

View File

@@ -0,0 +1,466 @@
# ================================================================
# services/orders/app/services/cache_service.py
# ================================================================
"""
Cache Service - Redis caching for procurement plans and related data
"""
import json
import uuid
from datetime import datetime, date, timedelta
from typing import Optional, Dict, Any, List
import redis
import structlog
from pydantic import BaseModel
from app.core.config import settings
from app.models.procurement import ProcurementPlan
from app.schemas.procurement_schemas import ProcurementPlanResponse
logger = structlog.get_logger()
class CacheService:
"""Service for managing Redis cache operations"""
def __init__(self, redis_url: Optional[str] = None):
"""Initialize Redis connection"""
self.redis_url = redis_url or settings.REDIS_URL
self._redis_client = None
self._connect()
def _connect(self):
"""Connect to Redis"""
try:
self._redis_client = redis.from_url(
self.redis_url,
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={"TCP_KEEPIDLE": 1, "TCP_KEEPINTVL": 3, "TCP_KEEPCNT": 5},
retry_on_timeout=True,
max_connections=50
)
# Test connection
self._redis_client.ping()
logger.info("Redis connection established")
except Exception as e:
logger.error("Failed to connect to Redis", error=str(e))
self._redis_client = None
@property
def redis(self):
"""Get Redis client with connection check"""
if self._redis_client is None:
self._connect()
return self._redis_client
def is_available(self) -> bool:
"""Check if Redis is available"""
try:
return self.redis is not None and self.redis.ping()
except Exception:
return False
# ================================================================
# PROCUREMENT PLAN CACHING
# ================================================================
def _get_plan_key(self, tenant_id: uuid.UUID, plan_date: Optional[date] = None, plan_id: Optional[uuid.UUID] = None) -> str:
"""Generate cache key for procurement plan"""
if plan_id:
return f"procurement:plan:id:{tenant_id}:{plan_id}"
elif plan_date:
return f"procurement:plan:date:{tenant_id}:{plan_date.isoformat()}"
else:
return f"procurement:plan:current:{tenant_id}"
def _get_dashboard_key(self, tenant_id: uuid.UUID) -> str:
"""Generate cache key for dashboard data"""
return f"procurement:dashboard:{tenant_id}"
def _get_requirements_key(self, tenant_id: uuid.UUID, plan_id: uuid.UUID) -> str:
"""Generate cache key for plan requirements"""
return f"procurement:requirements:{tenant_id}:{plan_id}"
async def cache_procurement_plan(
self,
plan: ProcurementPlan,
ttl_hours: int = 6
) -> bool:
"""Cache a procurement plan with multiple keys for different access patterns"""
if not self.is_available():
logger.warning("Redis not available, skipping cache")
return False
try:
# Convert plan to cacheable format
plan_data = self._serialize_plan(plan)
ttl_seconds = ttl_hours * 3600
# Cache by plan ID
id_key = self._get_plan_key(plan.tenant_id, plan_id=plan.id)
self.redis.setex(id_key, ttl_seconds, plan_data)
# Cache by plan date
date_key = self._get_plan_key(plan.tenant_id, plan_date=plan.plan_date)
self.redis.setex(date_key, ttl_seconds, plan_data)
# If this is today's plan, cache as current
if plan.plan_date == date.today():
current_key = self._get_plan_key(plan.tenant_id)
self.redis.setex(current_key, ttl_seconds, plan_data)
# Cache requirements separately for faster access
if plan.requirements:
requirements_data = self._serialize_requirements(plan.requirements)
req_key = self._get_requirements_key(plan.tenant_id, plan.id)
self.redis.setex(req_key, ttl_seconds, requirements_data)
# Update plan list cache
await self._update_plan_list_cache(plan.tenant_id, plan)
logger.info("Procurement plan cached", plan_id=plan.id, tenant_id=plan.tenant_id)
return True
except Exception as e:
logger.error("Error caching procurement plan", error=str(e), plan_id=plan.id)
return False
async def get_cached_plan(
self,
tenant_id: uuid.UUID,
plan_date: Optional[date] = None,
plan_id: Optional[uuid.UUID] = None
) -> Optional[Dict[str, Any]]:
"""Get cached procurement plan"""
if not self.is_available():
return None
try:
key = self._get_plan_key(tenant_id, plan_date, plan_id)
cached_data = self.redis.get(key)
if cached_data:
plan_data = json.loads(cached_data)
logger.debug("Procurement plan retrieved from cache",
tenant_id=tenant_id, key=key)
return plan_data
return None
except Exception as e:
logger.error("Error retrieving cached plan", error=str(e))
return None
async def get_cached_requirements(
self,
tenant_id: uuid.UUID,
plan_id: uuid.UUID
) -> Optional[List[Dict[str, Any]]]:
"""Get cached plan requirements"""
if not self.is_available():
return None
try:
key = self._get_requirements_key(tenant_id, plan_id)
cached_data = self.redis.get(key)
if cached_data:
requirements_data = json.loads(cached_data)
logger.debug("Requirements retrieved from cache",
tenant_id=tenant_id, plan_id=plan_id)
return requirements_data
return None
except Exception as e:
logger.error("Error retrieving cached requirements", error=str(e))
return None
async def cache_dashboard_data(
self,
tenant_id: uuid.UUID,
dashboard_data: Dict[str, Any],
ttl_hours: int = 1
) -> bool:
"""Cache dashboard data with shorter TTL"""
if not self.is_available():
return False
try:
key = self._get_dashboard_key(tenant_id)
data_json = json.dumps(dashboard_data, cls=DateTimeEncoder)
ttl_seconds = ttl_hours * 3600
self.redis.setex(key, ttl_seconds, data_json)
logger.debug("Dashboard data cached", tenant_id=tenant_id)
return True
except Exception as e:
logger.error("Error caching dashboard data", error=str(e))
return False
async def get_cached_dashboard_data(self, tenant_id: uuid.UUID) -> Optional[Dict[str, Any]]:
"""Get cached dashboard data"""
if not self.is_available():
return None
try:
key = self._get_dashboard_key(tenant_id)
cached_data = self.redis.get(key)
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error("Error retrieving cached dashboard data", error=str(e))
return None
async def invalidate_plan_cache(
self,
tenant_id: uuid.UUID,
plan_id: Optional[uuid.UUID] = None,
plan_date: Optional[date] = None
) -> bool:
"""Invalidate cached procurement plan data"""
if not self.is_available():
return False
try:
keys_to_delete = []
if plan_id:
# Delete specific plan cache
keys_to_delete.append(self._get_plan_key(tenant_id, plan_id=plan_id))
keys_to_delete.append(self._get_requirements_key(tenant_id, plan_id))
if plan_date:
keys_to_delete.append(self._get_plan_key(tenant_id, plan_date=plan_date))
# Always invalidate current plan cache and dashboard
keys_to_delete.extend([
self._get_plan_key(tenant_id),
self._get_dashboard_key(tenant_id)
])
# Delete plan list cache
list_key = f"procurement:plans:list:{tenant_id}:*"
list_keys = self.redis.keys(list_key)
keys_to_delete.extend(list_keys)
if keys_to_delete:
self.redis.delete(*keys_to_delete)
logger.info("Plan cache invalidated",
tenant_id=tenant_id, keys_count=len(keys_to_delete))
return True
except Exception as e:
logger.error("Error invalidating plan cache", error=str(e))
return False
# ================================================================
# LIST CACHING
# ================================================================
async def _update_plan_list_cache(self, tenant_id: uuid.UUID, plan: ProcurementPlan) -> None:
"""Update cached plan lists"""
try:
# Add plan to various lists
list_keys = [
f"procurement:plans:list:{tenant_id}:all",
f"procurement:plans:list:{tenant_id}:status:{plan.status}",
f"procurement:plans:list:{tenant_id}:month:{plan.plan_date.strftime('%Y-%m')}"
]
plan_summary = {
"id": str(plan.id),
"plan_number": plan.plan_number,
"plan_date": plan.plan_date.isoformat(),
"status": plan.status,
"total_requirements": plan.total_requirements,
"total_estimated_cost": float(plan.total_estimated_cost),
"created_at": plan.created_at.isoformat()
}
for key in list_keys:
# Use sorted sets for automatic ordering by date
score = plan.plan_date.toordinal() # Use ordinal date as score
self.redis.zadd(key, {json.dumps(plan_summary): score})
self.redis.expire(key, 3600) # 1 hour TTL
except Exception as e:
logger.warning("Error updating plan list cache", error=str(e))
# ================================================================
# PERFORMANCE METRICS CACHING
# ================================================================
async def cache_performance_metrics(
self,
tenant_id: uuid.UUID,
metrics: Dict[str, Any],
ttl_hours: int = 24
) -> bool:
"""Cache performance metrics"""
if not self.is_available():
return False
try:
key = f"procurement:metrics:{tenant_id}"
data_json = json.dumps(metrics, cls=DateTimeEncoder)
ttl_seconds = ttl_hours * 3600
self.redis.setex(key, ttl_seconds, data_json)
return True
except Exception as e:
logger.error("Error caching performance metrics", error=str(e))
return False
async def get_cached_metrics(self, tenant_id: uuid.UUID) -> Optional[Dict[str, Any]]:
"""Get cached performance metrics"""
if not self.is_available():
return None
try:
key = f"procurement:metrics:{tenant_id}"
cached_data = self.redis.get(key)
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error("Error retrieving cached metrics", error=str(e))
return None
# ================================================================
# UTILITY METHODS
# ================================================================
def _serialize_plan(self, plan: ProcurementPlan) -> str:
"""Serialize procurement plan for caching"""
try:
# Convert to dict, handling special types
plan_dict = {
"id": str(plan.id),
"tenant_id": str(plan.tenant_id),
"plan_number": plan.plan_number,
"plan_date": plan.plan_date.isoformat(),
"plan_period_start": plan.plan_period_start.isoformat(),
"plan_period_end": plan.plan_period_end.isoformat(),
"status": plan.status,
"plan_type": plan.plan_type,
"priority": plan.priority,
"total_requirements": plan.total_requirements,
"total_estimated_cost": float(plan.total_estimated_cost),
"total_approved_cost": float(plan.total_approved_cost),
"safety_stock_buffer": float(plan.safety_stock_buffer),
"supply_risk_level": plan.supply_risk_level,
"created_at": plan.created_at.isoformat(),
"updated_at": plan.updated_at.isoformat(),
# Add requirements count for quick reference
"requirements_count": len(plan.requirements) if plan.requirements else 0
}
return json.dumps(plan_dict)
except Exception as e:
logger.error("Error serializing plan", error=str(e))
raise
def _serialize_requirements(self, requirements: List) -> str:
"""Serialize requirements for caching"""
try:
requirements_data = []
for req in requirements:
req_dict = {
"id": str(req.id),
"requirement_number": req.requirement_number,
"product_id": str(req.product_id),
"product_name": req.product_name,
"status": req.status,
"priority": req.priority,
"required_quantity": float(req.required_quantity),
"net_requirement": float(req.net_requirement),
"estimated_total_cost": float(req.estimated_total_cost or 0),
"required_by_date": req.required_by_date.isoformat(),
"suggested_order_date": req.suggested_order_date.isoformat()
}
requirements_data.append(req_dict)
return json.dumps(requirements_data)
except Exception as e:
logger.error("Error serializing requirements", error=str(e))
raise
async def clear_tenant_cache(self, tenant_id: uuid.UUID) -> bool:
"""Clear all cached data for a tenant"""
if not self.is_available():
return False
try:
pattern = f"*:{tenant_id}*"
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
logger.info("Tenant cache cleared", tenant_id=tenant_id, keys_count=len(keys))
return True
except Exception as e:
logger.error("Error clearing tenant cache", error=str(e))
return False
def get_cache_stats(self) -> Dict[str, Any]:
"""Get Redis cache statistics"""
if not self.is_available():
return {"available": False}
try:
info = self.redis.info()
return {
"available": True,
"used_memory": info.get("used_memory_human"),
"connected_clients": info.get("connected_clients"),
"total_connections_received": info.get("total_connections_received"),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"hit_rate": self._calculate_hit_rate(
info.get("keyspace_hits", 0),
info.get("keyspace_misses", 0)
)
}
except Exception as e:
logger.error("Error getting cache stats", error=str(e))
return {"available": False, "error": str(e)}
def _calculate_hit_rate(self, hits: int, misses: int) -> float:
"""Calculate cache hit rate percentage"""
total = hits + misses
return (hits / total * 100) if total > 0 else 0.0
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles datetime objects"""
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
return super().default(obj)
# Global cache service instance
_cache_service = None
def get_cache_service() -> CacheService:
"""Get the global cache service instance"""
global _cache_service
if _cache_service is None:
_cache_service = CacheService()
return _cache_service

View File

@@ -0,0 +1,580 @@
# ================================================================
# services/orders/app/services/procurement_service.py
# ================================================================
"""
Procurement Service - Business logic for procurement planning and scheduling
"""
import asyncio
import uuid
from datetime import datetime, date, timedelta
from decimal import Decimal
from typing import List, Optional, Dict, Any, Tuple
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.procurement import ProcurementPlan, ProcurementRequirement
from app.repositories.procurement_repository import ProcurementPlanRepository, ProcurementRequirementRepository
from app.schemas.procurement_schemas import (
ProcurementPlanCreate, ProcurementPlanResponse, ProcurementRequirementCreate,
GeneratePlanRequest, GeneratePlanResponse, DashboardData, ProcurementSummary
)
from app.core.config import settings
from shared.clients.inventory_client import InventoryServiceClient
from shared.clients.forecast_client import ForecastServiceClient
from shared.config.base import BaseServiceSettings
from shared.messaging.rabbitmq import RabbitMQClient
from shared.monitoring.decorators import monitor_performance
from app.services.cache_service import get_cache_service, CacheService
logger = structlog.get_logger()
class ProcurementService:
"""Service for managing procurement plans and scheduling"""
def __init__(
self,
db: AsyncSession,
config: BaseServiceSettings,
inventory_client: Optional[InventoryServiceClient] = None,
forecast_client: Optional[ForecastServiceClient] = None,
cache_service: Optional[CacheService] = None
):
self.db = db
self.config = config
self.plan_repo = ProcurementPlanRepository(db)
self.requirement_repo = ProcurementRequirementRepository(db)
# Initialize service clients
self.inventory_client = inventory_client or InventoryServiceClient(config)
self.forecast_client = forecast_client or ForecastServiceClient(config, "orders-service")
self.cache_service = cache_service or get_cache_service()
# Initialize RabbitMQ client
rabbitmq_url = getattr(config, 'RABBITMQ_URL', 'amqp://guest:guest@localhost:5672/')
self.rabbitmq_client = RabbitMQClient(rabbitmq_url, "orders-service")
# ================================================================
# PROCUREMENT PLAN OPERATIONS
# ================================================================
async def get_current_plan(self, tenant_id: uuid.UUID) -> Optional[ProcurementPlanResponse]:
"""Get the current day's procurement plan"""
try:
# Try cache first
cached_plan = await self.cache_service.get_cached_plan(tenant_id)
if cached_plan:
return ProcurementPlanResponse.model_validate(cached_plan)
# Get from database
plan = await self.plan_repo.get_current_plan(tenant_id)
if plan:
# Cache the result
await self.cache_service.cache_procurement_plan(plan)
return ProcurementPlanResponse.model_validate(plan)
return None
except Exception as e:
logger.error("Error getting current plan", error=str(e), tenant_id=tenant_id)
return None
async def get_plan_by_date(self, tenant_id: uuid.UUID, plan_date: date) -> Optional[ProcurementPlanResponse]:
"""Get procurement plan for a specific date"""
try:
plan = await self.plan_repo.get_plan_by_date(plan_date, tenant_id)
return ProcurementPlanResponse.model_validate(plan) if plan else None
except Exception as e:
logger.error("Error getting plan by date", error=str(e), tenant_id=tenant_id, date=plan_date)
return None
async def get_plan_by_id(self, tenant_id: uuid.UUID, plan_id: uuid.UUID) -> Optional[ProcurementPlanResponse]:
"""Get procurement plan by ID"""
try:
plan = await self.plan_repo.get_plan_by_id(plan_id, tenant_id)
return ProcurementPlanResponse.model_validate(plan) if plan else None
except Exception as e:
logger.error("Error getting plan by ID", error=str(e), tenant_id=tenant_id, plan_id=plan_id)
return None
@monitor_performance("generate_procurement_plan")
async def generate_procurement_plan(
self,
tenant_id: uuid.UUID,
request: GeneratePlanRequest
) -> GeneratePlanResponse:
"""Generate a new procurement plan based on forecasts and inventory"""
try:
plan_date = request.plan_date or date.today()
# Check if plan already exists
existing_plan = await self.plan_repo.get_plan_by_date(plan_date, tenant_id)
if existing_plan and not request.force_regenerate:
return GeneratePlanResponse(
success=True,
message="Plan already exists for this date",
plan=ProcurementPlanResponse.model_validate(existing_plan),
warnings=["Plan already exists. Use force_regenerate=true to recreate."]
)
logger.info("Starting procurement plan generation", tenant_id=tenant_id, plan_date=plan_date)
# Step 1: Get current inventory
inventory_items = await self._get_inventory_list(tenant_id)
if not inventory_items:
return GeneratePlanResponse(
success=False,
message="No inventory items found",
errors=["Unable to retrieve inventory data"]
)
# Step 2: Generate forecasts for each inventory item
forecasts = await self._generate_demand_forecasts(
tenant_id,
inventory_items,
plan_date,
request.planning_horizon_days
)
# Step 3: Create procurement plan
plan_data = await self._create_plan_data(
tenant_id,
plan_date,
request,
inventory_items,
forecasts
)
# Delete existing plan if force regenerate
if existing_plan and request.force_regenerate:
await self.plan_repo.delete_plan(existing_plan.id, tenant_id)
await self.db.flush()
# Step 4: Save plan to database
plan = await self.plan_repo.create_plan(plan_data)
# Step 5: Create requirements
requirements_data = await self._create_requirements_data(
plan.id,
inventory_items,
forecasts,
request
)
if requirements_data:
await self.requirement_repo.create_requirements_batch(requirements_data)
await self.db.commit()
# Step 6: Cache the plan and publish event
await self._cache_procurement_plan(plan)
await self._publish_plan_generated_event(tenant_id, plan.id)
logger.info("Procurement plan generated successfully",
tenant_id=tenant_id, plan_id=plan.id, requirements_count=len(requirements_data))
# Refresh plan with requirements
saved_plan = await self.plan_repo.get_plan_by_id(plan.id, tenant_id)
return GeneratePlanResponse(
success=True,
message="Procurement plan generated successfully",
plan=ProcurementPlanResponse.model_validate(saved_plan)
)
except Exception as e:
await self.db.rollback()
logger.error("Error generating procurement plan", error=str(e), tenant_id=tenant_id)
return GeneratePlanResponse(
success=False,
message="Failed to generate procurement plan",
errors=[str(e)]
)
async def update_plan_status(
self,
tenant_id: uuid.UUID,
plan_id: uuid.UUID,
status: str,
updated_by: Optional[uuid.UUID] = None
) -> Optional[ProcurementPlanResponse]:
"""Update procurement plan status"""
try:
updates = {"status": status, "updated_by": updated_by}
if status == "approved":
updates["approved_at"] = datetime.utcnow()
updates["approved_by"] = updated_by
elif status == "in_execution":
updates["execution_started_at"] = datetime.utcnow()
elif status in ["completed", "cancelled"]:
updates["execution_completed_at"] = datetime.utcnow()
plan = await self.plan_repo.update_plan(plan_id, tenant_id, updates)
if plan:
await self.db.commit()
await self._cache_procurement_plan(plan)
return ProcurementPlanResponse.model_validate(plan)
return None
except Exception as e:
await self.db.rollback()
logger.error("Error updating plan status", error=str(e), plan_id=plan_id)
return None
# ================================================================
# DASHBOARD AND ANALYTICS
# ================================================================
async def get_dashboard_data(self, tenant_id: uuid.UUID) -> Optional[DashboardData]:
"""Get procurement dashboard data"""
try:
current_plan = await self.get_current_plan(tenant_id)
summary = await self._get_procurement_summary(tenant_id)
# Get additional dashboard data
upcoming_deliveries = await self._get_upcoming_deliveries(tenant_id)
overdue_requirements = await self._get_overdue_requirements(tenant_id)
low_stock_alerts = await self._get_low_stock_alerts(tenant_id)
performance_metrics = await self._get_performance_metrics(tenant_id)
return DashboardData(
current_plan=current_plan,
summary=summary,
upcoming_deliveries=upcoming_deliveries,
overdue_requirements=overdue_requirements,
low_stock_alerts=low_stock_alerts,
performance_metrics=performance_metrics
)
except Exception as e:
logger.error("Error getting dashboard data", error=str(e), tenant_id=tenant_id)
return None
# ================================================================
# DAILY SCHEDULER
# ================================================================
async def run_daily_scheduler(self) -> None:
"""Run the daily procurement planning scheduler"""
logger.info("Starting daily procurement scheduler")
try:
# This would typically be called by a cron job or scheduler service
# Get all active tenants (this would come from tenant service)
active_tenants = await self._get_active_tenants()
for tenant_id in active_tenants:
try:
await self._process_daily_plan_for_tenant(tenant_id)
except Exception as e:
logger.error("Error processing daily plan for tenant",
error=str(e), tenant_id=tenant_id)
continue
logger.info("Daily procurement scheduler completed", processed_tenants=len(active_tenants))
except Exception as e:
logger.error("Error in daily scheduler", error=str(e))
async def _process_daily_plan_for_tenant(self, tenant_id: uuid.UUID) -> None:
"""Process daily procurement plan for a specific tenant"""
try:
today = date.today()
# Check if plan already exists for today
existing_plan = await self.plan_repo.get_plan_by_date(today, tenant_id)
if existing_plan:
logger.info("Daily plan already exists", tenant_id=tenant_id, date=today)
return
# Generate plan for today
request = GeneratePlanRequest(
plan_date=today,
planning_horizon_days=settings.DEMAND_FORECAST_DAYS,
include_safety_stock=True,
safety_stock_percentage=Decimal(str(settings.SAFETY_STOCK_PERCENTAGE))
)
result = await self.generate_procurement_plan(tenant_id, request)
if result.success:
logger.info("Daily plan generated successfully", tenant_id=tenant_id)
else:
logger.error("Failed to generate daily plan",
tenant_id=tenant_id, errors=result.errors)
except Exception as e:
logger.error("Error processing daily plan", error=str(e), tenant_id=tenant_id)
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _get_inventory_list(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get current inventory list from inventory service"""
try:
return await self.inventory_client.get_all_ingredients(str(tenant_id))
except Exception as e:
logger.error("Error fetching inventory", error=str(e), tenant_id=tenant_id)
return []
async def _generate_demand_forecasts(
self,
tenant_id: uuid.UUID,
inventory_items: List[Dict[str, Any]],
target_date: date,
horizon_days: int
) -> Dict[str, Dict[str, Any]]:
"""Generate demand forecasts for inventory items"""
forecasts = {}
try:
# For each inventory item, request forecast
for item in inventory_items:
item_id = item.get('id')
if not item_id:
continue
try:
# Call forecast service for next day demand
forecast_data = await self.forecast_client.create_realtime_prediction(
tenant_id=str(tenant_id),
model_id="default", # Use default model or tenant-specific model
target_date=target_date.isoformat(),
features={
"product_id": item_id,
"current_stock": item.get('current_stock', 0),
"historical_usage": item.get('avg_daily_usage', 0),
"seasonality": self._calculate_seasonality_factor(target_date),
"day_of_week": target_date.weekday(),
"is_weekend": target_date.weekday() >= 5
}
)
if forecast_data:
forecasts[item_id] = forecast_data
except Exception as e:
logger.warning("Error forecasting for item",
item_id=item_id, error=str(e))
# Use fallback prediction
forecasts[item_id] = self._create_fallback_forecast(item)
return forecasts
except Exception as e:
logger.error("Error generating forecasts", error=str(e), tenant_id=tenant_id)
return {}
async def _create_plan_data(
self,
tenant_id: uuid.UUID,
plan_date: date,
request: GeneratePlanRequest,
inventory_items: List[Dict[str, Any]],
forecasts: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Create procurement plan data"""
plan_number = await self.plan_repo.generate_plan_number(tenant_id, plan_date)
total_items = len(inventory_items)
total_forecast_demand = sum(
f.get('predicted_value', 0) for f in forecasts.values()
)
return {
'tenant_id': tenant_id,
'plan_number': plan_number,
'plan_date': plan_date,
'plan_period_start': plan_date,
'plan_period_end': plan_date + timedelta(days=request.planning_horizon_days),
'planning_horizon_days': request.planning_horizon_days,
'status': 'draft',
'plan_type': 'regular',
'priority': 'normal',
'procurement_strategy': 'just_in_time',
'safety_stock_buffer': request.safety_stock_percentage,
'total_demand_quantity': Decimal(str(total_forecast_demand)),
'supply_risk_level': 'low',
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
async def _create_requirements_data(
self,
plan_id: uuid.UUID,
inventory_items: List[Dict[str, Any]],
forecasts: Dict[str, Dict[str, Any]],
request: GeneratePlanRequest
) -> List[Dict[str, Any]]:
"""Create procurement requirements data"""
requirements = []
for item in inventory_items:
item_id = item.get('id')
if not item_id or item_id not in forecasts:
continue
forecast = forecasts[item_id]
current_stock = Decimal(str(item.get('current_stock', 0)))
predicted_demand = Decimal(str(forecast.get('predicted_value', 0)))
safety_stock = predicted_demand * (request.safety_stock_percentage / 100)
total_needed = predicted_demand + safety_stock
net_requirement = max(Decimal('0'), total_needed - current_stock)
if net_requirement > 0: # Only create requirement if needed
requirement_number = await self.requirement_repo.generate_requirement_number(plan_id)
required_by_date = request.plan_date or date.today()
suggested_order_date = required_by_date - timedelta(days=settings.PROCUREMENT_LEAD_TIME_DAYS)
latest_order_date = required_by_date - timedelta(days=1)
requirements.append({
'plan_id': plan_id,
'requirement_number': requirement_number,
'product_id': uuid.UUID(item_id),
'product_name': item.get('name', ''),
'product_sku': item.get('sku', ''),
'product_category': item.get('category', ''),
'product_type': 'ingredient',
'required_quantity': predicted_demand,
'unit_of_measure': item.get('unit', 'kg'),
'safety_stock_quantity': safety_stock,
'total_quantity_needed': total_needed,
'current_stock_level': current_stock,
'available_stock': current_stock,
'net_requirement': net_requirement,
'forecast_demand': predicted_demand,
'buffer_demand': safety_stock,
'required_by_date': required_by_date,
'suggested_order_date': suggested_order_date,
'latest_order_date': latest_order_date,
'priority': self._calculate_priority(net_requirement, current_stock),
'risk_level': self._calculate_risk_level(item, forecast),
'status': 'pending',
'delivery_status': 'pending',
'ordered_quantity': Decimal('0'),
'received_quantity': Decimal('0'),
'estimated_unit_cost': Decimal(str(item.get('avg_cost', 0))),
'estimated_total_cost': net_requirement * Decimal(str(item.get('avg_cost', 0)))
})
return requirements
def _calculate_priority(self, net_requirement: Decimal, current_stock: Decimal) -> str:
"""Calculate requirement priority based on stock levels"""
if current_stock <= 0:
return 'critical'
stock_ratio = net_requirement / current_stock if current_stock > 0 else float('inf')
if stock_ratio >= 2:
return 'critical'
elif stock_ratio >= 1:
return 'high'
elif stock_ratio >= 0.5:
return 'normal'
else:
return 'low'
def _calculate_risk_level(self, item: Dict[str, Any], forecast: Dict[str, Any]) -> str:
"""Calculate risk level for procurement requirement"""
confidence = forecast.get('confidence_score', 0.8)
lead_time = item.get('supplier_lead_time', 3)
if confidence < 0.6 or lead_time > 7:
return 'high'
elif confidence < 0.8 or lead_time > 3:
return 'medium'
else:
return 'low'
def _calculate_seasonality_factor(self, target_date: date) -> float:
"""Calculate seasonality adjustment factor"""
# Simple seasonality based on month
seasonal_factors = {
12: 1.3, 1: 1.2, 2: 0.9, # Winter
3: 1.1, 4: 1.2, 5: 1.3, # Spring
6: 1.4, 7: 1.5, 8: 1.4, # Summer
9: 1.2, 10: 1.1, 11: 1.2 # Fall
}
return seasonal_factors.get(target_date.month, 1.0)
def _create_fallback_forecast(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Create fallback forecast when service is unavailable"""
avg_usage = item.get('avg_daily_usage', 0)
return {
'predicted_value': avg_usage * 1.1, # 10% buffer
'confidence_score': 0.5,
'lower_bound': avg_usage * 0.8,
'upper_bound': avg_usage * 1.3,
'fallback': True
}
async def _cache_procurement_plan(self, plan: ProcurementPlan) -> None:
"""Cache procurement plan in Redis"""
try:
await self.cache_service.cache_procurement_plan(plan)
except Exception as e:
logger.warning("Failed to cache plan", error=str(e), plan_id=plan.id)
async def _publish_plan_generated_event(self, tenant_id: uuid.UUID, plan_id: uuid.UUID) -> None:
"""Publish plan generated event"""
try:
event_data = {
"tenant_id": str(tenant_id),
"plan_id": str(plan_id),
"timestamp": datetime.utcnow().isoformat(),
"event_type": "procurement.plan.generated"
}
await self.rabbitmq_client.publish_event(
exchange_name="procurement.events",
routing_key="procurement.plan.generated",
event_data=event_data
)
except Exception as e:
logger.warning("Failed to publish event", error=str(e))
async def _get_active_tenants(self) -> List[uuid.UUID]:
"""Get list of active tenant IDs"""
# This would typically call the tenant service
# For now, return empty list - would be implemented with actual tenant service
return []
async def _get_procurement_summary(self, tenant_id: uuid.UUID) -> ProcurementSummary:
"""Get procurement summary for dashboard"""
# Implement summary calculation
return ProcurementSummary(
total_plans=0,
active_plans=0,
total_requirements=0,
pending_requirements=0,
critical_requirements=0,
total_estimated_cost=Decimal('0'),
total_approved_cost=Decimal('0'),
cost_variance=Decimal('0')
)
async def _get_upcoming_deliveries(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get upcoming deliveries"""
return []
async def _get_overdue_requirements(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get overdue requirements"""
return []
async def _get_low_stock_alerts(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]:
"""Get low stock alerts from inventory service"""
try:
return await self.inventory_client.get_low_stock_alerts(str(tenant_id))
except Exception as e:
logger.error("Error getting low stock alerts", error=str(e))
return []
async def _get_performance_metrics(self, tenant_id: uuid.UUID) -> Dict[str, Any]:
"""Get performance metrics"""
return {}