Files
bakery-ia/services/procurement/app/services/purchase_order_service.py
Urtzi Alfaro c68d82ca7f Fix critical bugs and standardize service integrations
Critical Fixes:
- Orchestrator: Add missing OrchestrationStatus import (fixes HTTP 500 during demo clone)
- Procurement: Migrate from custom cache utils to shared Redis utils
- Suppliers: Use proper Settings for Redis configuration with TLS/auth
- Recipes/Suppliers clients: Fix endpoint paths (remove duplicate path segments)
- Procurement client: Use suppliers service directly for supplier details

Details:
1. services/orchestrator/app/api/internal_demo.py:
   - Added OrchestrationStatus import to fix cloning error
   - This was causing HTTP 500 errors during demo session cloning

2. services/procurement/app/api/purchase_orders.py + service:
   - Replaced app.utils.cache with shared.redis_utils
   - Standardizes caching across all services
   - Removed custom cache utilities (deleted app/utils/cache.py)

3. services/suppliers/app/consumers/alert_event_consumer.py:
   - Use Settings().REDIS_URL instead of os.getenv
   - Ensures proper Redis connection with TLS and authentication

4. shared/clients/recipes_client.py:
   - Fixed endpoint paths: recipes/recipes/{id} → recipes/{id}
   - Applied to all recipe methods (by_id, by_products, instructions, yield)

5. shared/clients/suppliers_client.py:
   - Fixed endpoint path: suppliers/suppliers/{id} → suppliers/{id}

6. shared/clients/procurement_client.py:
   - get_supplier_by_id now uses SuppliersServiceClient directly
   - Removes incorrect call to procurement service for supplier details

Impact:
- Demo session cloning now works without orchestrator errors 
- Consistent Redis usage across all services
- Correct service boundaries (suppliers data from suppliers service)
- Clean client endpoint paths

🤖 Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-16 11:33:22 +01:00

1141 lines
48 KiB
Python

# ================================================================
# services/procurement/app/services/purchase_order_service.py
# ================================================================
"""
Purchase Order Service - Business logic for purchase order management
Migrated from Suppliers Service to Procurement Service ownership
"""
import uuid
from datetime import datetime, date, timedelta
from decimal import Decimal
from typing import List, Optional, Dict, Any
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.purchase_order import PurchaseOrder, PurchaseOrderItem, Delivery, DeliveryItem, SupplierInvoice
from app.repositories.purchase_order_repository import (
PurchaseOrderRepository,
PurchaseOrderItemRepository,
DeliveryRepository,
SupplierInvoiceRepository
)
from app.schemas.purchase_order_schemas import (
PurchaseOrderCreate,
PurchaseOrderUpdate,
PurchaseOrderResponse,
DeliveryCreate,
DeliveryUpdate,
SupplierInvoiceCreate,
)
from app.core.config import settings
from shared.clients.suppliers_client import SuppliersServiceClient
from shared.clients.inventory_client import InventoryServiceClient
from shared.config.base import BaseServiceSettings
from shared.messaging import RabbitMQClient, UnifiedEventPublisher, EVENT_TYPES
from shared.redis_utils import get_keys_pattern, get_redis_client
logger = structlog.get_logger()
class PurchaseOrderService:
"""Service for purchase order management operations"""
def __init__(
self,
db: AsyncSession,
config: BaseServiceSettings,
suppliers_client: Optional[SuppliersServiceClient] = None,
rabbitmq_client: Optional[RabbitMQClient] = None,
event_publisher: Optional[UnifiedEventPublisher] = None,
inventory_client: Optional[InventoryServiceClient] = None
):
self.db = db
self.config = config
self.po_repo = PurchaseOrderRepository(db)
self.item_repo = PurchaseOrderItemRepository(db)
self.delivery_repo = DeliveryRepository(db)
self.invoice_repo = SupplierInvoiceRepository(db)
# Initialize suppliers client for supplier validation
self.suppliers_client = suppliers_client or SuppliersServiceClient(config)
# Initialize inventory client for stock information
self.inventory_client = inventory_client or InventoryServiceClient(config)
# Initialize event publisher for RabbitMQ events
self.rabbitmq_client = rabbitmq_client
self.event_publisher = event_publisher or UnifiedEventPublisher(rabbitmq_client, "procurement")
# Request-scoped cache for supplier data to avoid redundant API calls
# When enriching multiple POs with the same supplier, cache prevents duplicate calls
self._supplier_cache: Dict[str, Dict[str, Any]] = {}
# ================================================================
# PURCHASE ORDER CRUD
# ================================================================
async def create_purchase_order(
self,
tenant_id: uuid.UUID,
po_data: PurchaseOrderCreate,
created_by: Optional[uuid.UUID] = None
) -> PurchaseOrder:
"""
Create a new purchase order with items
Flow:
1. Validate supplier exists and is active
2. Generate PO number
3. Calculate totals
4. Determine approval requirements
5. Create PO and items
6. Link to procurement plan if provided
"""
try:
logger.info("Creating purchase order",
tenant_id=tenant_id,
supplier_id=po_data.supplier_id)
# Validate supplier
supplier = await self._get_and_validate_supplier(tenant_id, po_data.supplier_id)
# Generate PO number
po_number = await self.po_repo.generate_po_number(tenant_id)
# Calculate totals
subtotal = po_data.subtotal
total_amount = (
subtotal +
po_data.tax_amount +
po_data.shipping_cost -
po_data.discount_amount
)
# Determine approval requirements
requires_approval = self._requires_approval(total_amount, po_data.priority)
initial_status = self._determine_initial_status(total_amount, requires_approval)
# Set delivery date if not provided
required_delivery_date = po_data.required_delivery_date
estimated_delivery_date = date.today() + timedelta(days=supplier.get('standard_lead_time', 7))
# Create PO
po_create_data = {
'tenant_id': tenant_id,
'supplier_id': po_data.supplier_id,
'po_number': po_number,
'status': initial_status,
'priority': po_data.priority,
'order_date': datetime.utcnow(),
'required_delivery_date': required_delivery_date,
'estimated_delivery_date': estimated_delivery_date,
'subtotal': subtotal,
'tax_amount': po_data.tax_amount,
'shipping_cost': po_data.shipping_cost,
'discount_amount': po_data.discount_amount,
'total_amount': total_amount,
'currency': supplier.get('currency', 'EUR'),
'requires_approval': requires_approval,
'notes': po_data.notes,
'procurement_plan_id': po_data.procurement_plan_id,
'created_by': created_by,
'updated_by': created_by,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
purchase_order = await self.po_repo.create_po(po_create_data)
# Create PO items
for item_data in po_data.items:
item_create_data = {
'tenant_id': tenant_id,
'purchase_order_id': purchase_order.id,
'inventory_product_id': item_data.inventory_product_id,
'ordered_quantity': item_data.ordered_quantity,
'unit_price': item_data.unit_price,
'unit_of_measure': item_data.unit_of_measure,
'line_total': item_data.ordered_quantity * item_data.unit_price,
'received_quantity': Decimal('0'),
'quality_requirements': item_data.quality_requirements,
'item_notes': item_data.item_notes,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
await self.item_repo.create_item(item_create_data)
await self.db.commit()
logger.info("Purchase order created successfully",
tenant_id=tenant_id,
po_id=purchase_order.id,
po_number=po_number,
total_amount=float(total_amount))
# Emit alert if PO requires approval
if initial_status == 'pending_approval':
try:
await self._emit_po_approval_alert(
tenant_id=tenant_id,
purchase_order=purchase_order,
supplier=supplier
)
except Exception as alert_error:
# Log but don't fail PO creation if alert emission fails
logger.warning(
"Failed to emit PO approval alert",
po_id=str(purchase_order.id),
error=str(alert_error)
)
return purchase_order
except Exception as e:
await self.db.rollback()
logger.error("Error creating purchase order", error=str(e), tenant_id=tenant_id)
raise
async def get_purchase_order(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID
) -> Optional[PurchaseOrder]:
"""Get purchase order by ID with items"""
try:
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if po:
# Enrich with supplier information
await self._enrich_po_with_supplier(tenant_id, po)
return po
except Exception as e:
logger.error("Error getting purchase order", error=str(e), po_id=po_id)
return None
async def list_purchase_orders(
self,
tenant_id: uuid.UUID,
skip: int = 0,
limit: int = 50,
supplier_id: Optional[uuid.UUID] = None,
status: Optional[str] = None,
enrich_supplier: bool = True
) -> List[PurchaseOrder]:
"""
List purchase orders with filters
Args:
tenant_id: Tenant UUID
skip: Number of records to skip
limit: Maximum number of records
supplier_id: Optional supplier filter
status: Optional status filter
enrich_supplier: Whether to fetch and attach supplier details (default: True)
Set to False for faster queries when supplier data isn't needed
Returns:
List of purchase orders
"""
try:
# Convert status string to enum if provided
status_enum = None
if status:
try:
from app.models.purchase_order import PurchaseOrderStatus
# Convert from UPPERCASE to lowercase for enum lookup
status_enum = PurchaseOrderStatus[status.lower()]
except (KeyError, AttributeError):
logger.warning("Invalid status value provided", status=status)
status_enum = None
pos = await self.po_repo.list_purchase_orders(
tenant_id=tenant_id,
offset=skip, # Repository uses 'offset' parameter
limit=limit,
supplier_id=supplier_id,
status=status_enum
)
# Only enrich with supplier information if requested
# When enrich_supplier=False, returns POs with just supplier_id for client-side matching
if pos and enrich_supplier:
import asyncio
# Enrich with supplier information in parallel (Fix #9: Avoid N+1 query pattern)
# This fetches all supplier data concurrently instead of sequentially
enrichment_tasks = [self._enrich_po_with_supplier(tenant_id, po) for po in pos]
await asyncio.gather(*enrichment_tasks, return_exceptions=True)
return pos
except Exception as e:
logger.error("Error listing purchase orders", error=str(e), tenant_id=tenant_id)
return []
async def update_po(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID,
po_data: PurchaseOrderUpdate,
updated_by: Optional[uuid.UUID] = None
) -> Optional[PurchaseOrder]:
"""Update purchase order information"""
try:
logger.info("Updating purchase order", po_id=po_id)
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if not po:
return None
# Check if order can be modified
if po.status in ['completed', 'cancelled']:
raise ValueError("Cannot modify completed or cancelled orders")
# Prepare update data
update_data = po_data.model_dump(exclude_unset=True)
update_data['updated_by'] = updated_by
update_data['updated_at'] = datetime.utcnow()
# Recalculate totals if financial fields changed
if any(key in update_data for key in ['tax_amount', 'shipping_cost', 'discount_amount']):
total_amount = (
po.subtotal +
update_data.get('tax_amount', po.tax_amount) +
update_data.get('shipping_cost', po.shipping_cost) -
update_data.get('discount_amount', po.discount_amount)
)
update_data['total_amount'] = total_amount
po = await self.po_repo.update_po(po_id, tenant_id, update_data)
await self.db.commit()
logger.info("Purchase order updated successfully", po_id=po_id)
return po
except Exception as e:
await self.db.rollback()
logger.error("Error updating purchase order", error=str(e), po_id=po_id)
raise
async def update_order_status(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID,
status: str,
updated_by: Optional[uuid.UUID] = None,
notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Update purchase order status"""
try:
logger.info("Updating PO status", po_id=po_id, status=status)
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if not po:
return None
# Validate status transition
if not self._is_valid_status_transition(po.status, status):
raise ValueError(f"Invalid status transition from {po.status} to {status}")
update_data = {
'status': status,
'updated_by': updated_by,
'updated_at': datetime.utcnow()
}
if status == 'sent_to_supplier':
update_data['sent_to_supplier_at'] = datetime.utcnow()
elif status == 'confirmed':
update_data['supplier_confirmation_date'] = datetime.utcnow()
po = await self.po_repo.update_po(po_id, tenant_id, update_data)
await self.db.commit()
return po
except Exception as e:
await self.db.rollback()
logger.error("Error updating PO status", error=str(e), po_id=po_id)
raise
async def approve_purchase_order(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID,
approved_by: uuid.UUID,
approval_notes: Optional[str] = None
) -> Optional[PurchaseOrder]:
"""Approve a purchase order and publish approval event"""
try:
logger.info("Approving purchase order", po_id=po_id)
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if not po:
return None
if po.status not in ['draft', 'pending_approval']:
raise ValueError(f"Cannot approve order with status {po.status}")
# Get supplier details for event and delivery calculation
supplier = await self._get_and_validate_supplier(tenant_id, po.supplier_id)
# Calculate estimated delivery date based on supplier lead time
approved_at = datetime.utcnow()
standard_lead_time = supplier.get('standard_lead_time', 7) # Default 7 days
estimated_delivery_date = approved_at + timedelta(days=standard_lead_time)
update_data = {
'status': 'approved',
'approved_by': approved_by,
'approved_at': approved_at,
'estimated_delivery_date': estimated_delivery_date,
'updated_by': approved_by,
'updated_at': approved_at
}
po = await self.po_repo.update_po(po_id, tenant_id, update_data)
await self.db.commit()
# PHASE 2: Invalidate purchase orders cache
# Get all purchase order cache keys for this tenant and delete them
try:
cache_pattern = f"purchase_orders:{tenant_id}:*"
client = await get_redis_client()
if client:
keys = await client.keys(cache_pattern)
if keys:
await client.delete(*keys)
logger.debug("Invalidated purchase orders cache", pattern=cache_pattern, keys_deleted=len(keys), tenant_id=str(tenant_id))
except Exception as e:
logger.warning("Cache invalidation failed, continuing without cache invalidation",
pattern=f"purchase_orders:{tenant_id}:*", error=str(e))
# Acknowledge PO approval alerts (non-blocking)
try:
from shared.clients.alert_processor_client import get_alert_processor_client
alert_client = get_alert_processor_client(self.config, "procurement")
await alert_client.acknowledge_alerts_by_metadata(
tenant_id=tenant_id,
alert_type="po_approval_needed",
metadata_filter={"po_id": str(po_id)}
)
logger.debug("Acknowledged PO approval alerts", po_id=po_id)
except Exception as e:
# Log but don't fail the approval process
logger.warning("Failed to acknowledge PO approval alerts", po_id=po_id, error=str(e))
logger.info("Purchase order approved successfully", po_id=po_id)
# Publish PO approved event (non-blocking, fire-and-forget)
try:
# Get PO items for event
items = await self.item_repo.get_items_by_po(po_id)
items_data = [
{
"inventory_product_id": item.inventory_product_id,
"product_name": item.product_name or item.product_code,
"ordered_quantity": item.ordered_quantity,
"unit_of_measure": item.unit_of_measure,
"unit_price": item.unit_price,
"line_total": item.line_total
}
for item in items
]
event_data = {
"po_id": str(po_id),
"po_number": po.po_number,
"supplier_id": str(po.supplier_id),
"supplier_name": supplier.get('name', ''),
"supplier_email": supplier.get('email'),
"supplier_phone": supplier.get('phone'),
"total_amount": float(po.total_amount),
"currency": po.currency,
"required_delivery_date": po.required_delivery_date.isoformat() if po.required_delivery_date else None,
"items": items_data,
"approved_by": str(approved_by),
"approved_at": po.approved_at.isoformat()
}
await self.event_publisher.publish_business_event(
event_type=EVENT_TYPES.PROCUREMENT.PO_APPROVED,
tenant_id=tenant_id,
data=event_data
)
except Exception as event_error:
# Log but don't fail the approval if event publishing fails
logger.warning(
"Failed to publish PO approved event",
po_id=str(po_id),
error=str(event_error)
)
return po
except Exception as e:
await self.db.rollback()
logger.error("Error approving purchase order", error=str(e), po_id=po_id)
raise
async def reject_purchase_order(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID,
rejected_by: uuid.UUID,
rejection_reason: str
) -> Optional[PurchaseOrder]:
"""Reject a purchase order and publish rejection event"""
try:
logger.info("Rejecting purchase order", po_id=po_id)
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if not po:
return None
if po.status not in ['draft', 'pending_approval']:
raise ValueError(f"Cannot reject order with status {po.status}")
# Get supplier details for event
supplier = await self._get_and_validate_supplier(tenant_id, po.supplier_id)
update_data = {
'status': 'rejected',
'rejection_reason': rejection_reason,
'updated_by': rejected_by,
'updated_at': datetime.utcnow()
}
po = await self.po_repo.update_po(po_id, tenant_id, update_data)
await self.db.commit()
logger.info("Purchase order rejected", po_id=po_id)
# Publish PO rejected event (non-blocking, fire-and-forget)
try:
event_data = {
"po_id": str(po_id),
"po_number": po.po_number,
"supplier_id": str(po.supplier_id),
"supplier_name": supplier.get('name', ''),
"rejection_reason": rejection_reason,
"rejected_by": str(rejected_by),
"rejected_at": datetime.utcnow().isoformat()
}
await self.event_publisher.publish_business_event(
event_type=EVENT_TYPES.PROCUREMENT.PO_REJECTED,
tenant_id=tenant_id,
data=event_data
)
except Exception as event_error:
# Log but don't fail the rejection if event publishing fails
logger.warning(
"Failed to publish PO rejected event",
po_id=str(po_id),
error=str(event_error)
)
return po
except Exception as e:
await self.db.rollback()
logger.error("Error rejecting purchase order", error=str(e), po_id=po_id)
raise
async def cancel_purchase_order(
self,
tenant_id: uuid.UUID,
po_id: uuid.UUID,
cancelled_by: uuid.UUID,
cancellation_reason: str
) -> Optional[PurchaseOrder]:
"""Cancel a purchase order"""
try:
logger.info("Cancelling purchase order", po_id=po_id)
po = await self.po_repo.get_po_by_id(po_id, tenant_id)
if not po:
return None
if po.status in ['completed', 'cancelled']:
raise ValueError(f"Cannot cancel order with status {po.status}")
update_data = {
'status': 'cancelled',
'notes': f"{po.notes or ''}\nCancellation: {cancellation_reason}",
'updated_by': cancelled_by,
'updated_at': datetime.utcnow()
}
po = await self.po_repo.update_po(po_id, tenant_id, update_data)
await self.db.commit()
logger.info("Purchase order cancelled", po_id=po_id)
return po
except Exception as e:
await self.db.rollback()
logger.error("Error cancelling purchase order", error=str(e), po_id=po_id)
raise
# ================================================================
# DELIVERY MANAGEMENT
# ================================================================
async def create_delivery(
self,
tenant_id: uuid.UUID,
delivery_data: DeliveryCreate,
created_by: uuid.UUID
) -> Delivery:
"""Create a delivery record for a purchase order"""
try:
logger.info("Creating delivery", tenant_id=tenant_id, po_id=delivery_data.purchase_order_id)
# Validate PO exists
po = await self.po_repo.get_po_by_id(delivery_data.purchase_order_id, tenant_id)
if not po:
raise ValueError("Purchase order not found")
# Generate delivery number
delivery_number = await self.delivery_repo.generate_delivery_number(tenant_id)
# Create delivery
delivery_create_data = {
'tenant_id': tenant_id,
'purchase_order_id': delivery_data.purchase_order_id,
'supplier_id': delivery_data.supplier_id,
'delivery_number': delivery_number,
'supplier_delivery_note': delivery_data.supplier_delivery_note,
'status': 'scheduled',
'scheduled_date': delivery_data.scheduled_date,
'estimated_arrival': delivery_data.estimated_arrival,
'carrier_name': delivery_data.carrier_name,
'tracking_number': delivery_data.tracking_number,
'notes': delivery_data.notes,
'created_by': created_by,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
delivery = await self.delivery_repo.create_delivery(delivery_create_data)
# Create delivery items
for item_data in delivery_data.items:
item_create_data = {
'tenant_id': tenant_id,
'delivery_id': delivery.id,
'purchase_order_item_id': item_data.purchase_order_item_id,
'inventory_product_id': item_data.inventory_product_id,
'ordered_quantity': item_data.ordered_quantity,
'delivered_quantity': item_data.delivered_quantity,
'accepted_quantity': item_data.accepted_quantity,
'rejected_quantity': item_data.rejected_quantity,
'batch_lot_number': item_data.batch_lot_number,
'expiry_date': item_data.expiry_date,
'quality_grade': item_data.quality_grade,
'quality_issues': item_data.quality_issues,
'rejection_reason': item_data.rejection_reason,
'item_notes': item_data.item_notes,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
await self.delivery_repo.create_delivery_item(item_create_data)
await self.db.commit()
logger.info("Delivery created successfully",
tenant_id=tenant_id,
delivery_id=delivery.id,
delivery_number=delivery_number)
# Publish delivery received event (non-blocking, fire-and-forget)
try:
# Get all delivery items for the event
items_data = []
for item_data in delivery_data.items:
items_data.append({
"inventory_product_id": str(item_data.inventory_product_id),
"ordered_quantity": float(item_data.ordered_quantity),
"delivered_quantity": float(item_data.delivered_quantity),
"accepted_quantity": float(item_data.accepted_quantity),
"rejected_quantity": float(item_data.rejected_quantity),
"batch_lot_number": item_data.batch_lot_number,
"expiry_date": item_data.expiry_date.isoformat() if item_data.expiry_date else None,
"quality_grade": item_data.quality_grade,
"quality_issues": item_data.quality_issues,
"rejection_reason": item_data.rejection_reason
})
event_data = {
"delivery_id": str(delivery.id),
"po_id": str(delivery_data.purchase_order_id),
"items": items_data,
"received_at": datetime.utcnow().isoformat(),
"received_by": str(created_by)
}
await self.event_publisher.publish_business_event(
event_type=EVENT_TYPES.PROCUREMENT.DELIVERY_RECEIVED,
tenant_id=tenant_id,
data=event_data
)
except Exception as event_error:
# Log but don't fail the delivery creation if event publishing fails
logger.warning(
"Failed to publish delivery received event",
delivery_id=str(delivery.id),
error=str(event_error)
)
return delivery
except Exception as e:
await self.db.rollback()
logger.error("Error creating delivery", error=str(e), tenant_id=tenant_id)
raise
async def update_delivery_status(
self,
tenant_id: uuid.UUID,
delivery_id: uuid.UUID,
status: str,
updated_by: uuid.UUID
) -> Optional[Delivery]:
"""Update delivery status"""
try:
update_data = {
'status': status,
'updated_at': datetime.utcnow()
}
if status == 'in_transit':
update_data['actual_arrival'] = None
elif status == 'delivered':
update_data['actual_arrival'] = datetime.utcnow()
elif status == 'completed':
update_data['completed_at'] = datetime.utcnow()
delivery = await self.delivery_repo.update_delivery(delivery_id, tenant_id, update_data)
await self.db.commit()
return delivery
except Exception as e:
await self.db.rollback()
logger.error("Error updating delivery status", error=str(e), delivery_id=delivery_id)
raise
# ================================================================
# INVOICE MANAGEMENT
# ================================================================
async def create_invoice(
self,
tenant_id: uuid.UUID,
invoice_data: SupplierInvoiceCreate,
created_by: uuid.UUID
) -> SupplierInvoice:
"""Create a supplier invoice"""
try:
logger.info("Creating supplier invoice", tenant_id=tenant_id)
# Calculate total
total_amount = (
invoice_data.subtotal +
invoice_data.tax_amount +
invoice_data.shipping_cost -
invoice_data.discount_amount
)
# Get PO for currency
po = await self.po_repo.get_po_by_id(invoice_data.purchase_order_id, tenant_id)
if not po:
raise ValueError("Purchase order not found")
invoice_create_data = {
'tenant_id': tenant_id,
'purchase_order_id': invoice_data.purchase_order_id,
'supplier_id': invoice_data.supplier_id,
'invoice_number': invoice_data.invoice_number,
'status': 'received',
'invoice_date': invoice_data.invoice_date,
'due_date': invoice_data.due_date,
'subtotal': invoice_data.subtotal,
'tax_amount': invoice_data.tax_amount,
'shipping_cost': invoice_data.shipping_cost,
'discount_amount': invoice_data.discount_amount,
'total_amount': total_amount,
'currency': po.currency,
'paid_amount': Decimal('0'),
'remaining_amount': total_amount,
'notes': invoice_data.notes,
'payment_reference': invoice_data.payment_reference,
'created_by': created_by,
'updated_by': created_by,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
}
invoice = await self.invoice_repo.create_invoice(invoice_create_data)
await self.db.commit()
logger.info("Supplier invoice created", invoice_id=invoice.id)
return invoice
except Exception as e:
await self.db.rollback()
logger.error("Error creating invoice", error=str(e), tenant_id=tenant_id)
raise
# ================================================================
# PRIVATE HELPER METHODS
# ================================================================
async def _emit_po_approval_alert(
self,
tenant_id: uuid.UUID,
purchase_order: PurchaseOrder,
supplier: Dict[str, Any]
) -> None:
"""Emit raw alert for PO approval needed with structured parameters"""
try:
# Calculate urgency fields based on required delivery date
now = datetime.utcnow()
hours_until_consequence = None
deadline = None
if purchase_order.required_delivery_date:
# Deadline for approval is the required delivery date minus supplier lead time
# We need to approve it early enough for supplier to deliver on time
supplier_lead_time_days = supplier.get('standard_lead_time', 7)
approval_deadline = purchase_order.required_delivery_date - timedelta(days=supplier_lead_time_days)
deadline = approval_deadline
hours_until_consequence = (approval_deadline - now).total_seconds() / 3600
# Prepare alert payload matching RawAlert schema
alert_data = {
'id': str(uuid.uuid4()), # Generate unique alert ID
'tenant_id': str(tenant_id),
'service': 'procurement',
'type': 'po_approval_needed',
'alert_type': 'po_approval_needed', # Added for dashboard filtering
'type_class': 'action_needed', # Critical for dashboard action queue
'severity': 'high' if purchase_order.priority == 'critical' else 'medium',
'title': '', # Empty - will be generated by frontend with i18n
'message': '', # Empty - will be generated by frontend with i18n
'timestamp': datetime.utcnow().isoformat(),
'metadata': {
'po_id': str(purchase_order.id),
'po_number': purchase_order.po_number,
'supplier_id': str(purchase_order.supplier_id),
'supplier_name': supplier.get('name', ''),
'total_amount': float(purchase_order.total_amount),
'currency': purchase_order.currency,
'priority': purchase_order.priority,
'required_delivery_date': purchase_order.required_delivery_date.isoformat() if purchase_order.required_delivery_date else None,
'created_at': purchase_order.created_at.isoformat(),
# Add urgency context for dashboard prioritization
'financial_impact': float(purchase_order.total_amount),
'urgency_score': 85, # Default high urgency for pending approvals
# CRITICAL: Add deadline and hours_until_consequence for enrichment service
'deadline': deadline.isoformat() if deadline else None,
'hours_until_consequence': round(hours_until_consequence, 1) if hours_until_consequence else None,
# Include reasoning data from orchestrator OR build from inventory service
'reasoning_data': purchase_order.reasoning_data or await self._build_reasoning_data_fallback(
tenant_id, purchase_order, supplier
)
},
'message_params': {
'po_number': purchase_order.po_number,
'supplier_name': supplier.get('name', ''),
'total_amount': float(purchase_order.total_amount),
'currency': purchase_order.currency,
'priority': purchase_order.priority,
'required_delivery_date': purchase_order.required_delivery_date.isoformat() if purchase_order.required_delivery_date else None,
'items_count': len(purchase_order.items) if hasattr(purchase_order, 'items') else 0,
'created_at': purchase_order.created_at.isoformat()
},
'actions': ['approve_po', 'reject_po', 'modify_po'],
'item_type': 'alert'
}
# Publish to RabbitMQ
await self.rabbitmq_client.publish_event(
exchange_name='alerts.exchange',
routing_key=f'alert.{alert_data["severity"]}.procurement',
event_data=alert_data
)
logger.info(
"PO approval alert emitted",
po_id=str(purchase_order.id),
po_number=purchase_order.po_number,
tenant_id=str(tenant_id)
)
except Exception as e:
logger.error(
"Failed to emit PO approval alert",
po_id=str(purchase_order.id),
error=str(e)
)
raise
async def _build_reasoning_data_fallback(
self,
tenant_id: uuid.UUID,
purchase_order: PurchaseOrder,
supplier: Dict[str, Any]
) -> Dict[str, Any]:
"""Build rich reasoning data by querying inventory service for actual stock levels
This method is called when a PO doesn't have reasoning_data (e.g., manually created POs).
It queries the inventory service to get real stock levels and builds structured reasoning
that can be translated via i18n on the frontend.
"""
try:
# Query inventory service for actual stock levels
critical_products = []
min_depletion_hours = float('inf')
product_names = []
# Get items from PO - handle both relationship and explicit loading
items = purchase_order.items if hasattr(purchase_order, 'items') else []
for item in items:
product_names.append(item.product_name)
# Only query if we have ingredient_id
if not hasattr(item, 'ingredient_id') or not item.ingredient_id:
continue
try:
# Call inventory service to get current stock - with 2 second timeout
stock_entries = await self.inventory_client.get_ingredient_stock(
ingredient_id=item.ingredient_id,
tenant_id=str(tenant_id)
)
if stock_entries:
# Calculate total available stock
total_stock = sum(entry.get('quantity', 0) for entry in stock_entries)
# Estimate daily usage (this would ideally come from forecast service)
# For now, use a simple heuristic: if PO quantity is X, daily usage might be X/7
estimated_daily_usage = item.quantity / 7.0 if item.quantity else 1.0
if estimated_daily_usage > 0:
hours_until_depletion = (total_stock / estimated_daily_usage) * 24
# Mark as critical if less than 48 hours (2 days)
if hours_until_depletion < 48:
critical_products.append(item.product_name)
min_depletion_hours = min(min_depletion_hours, hours_until_depletion)
logger.info(
"Calculated stock depletion for PO item",
tenant_id=str(tenant_id),
product=item.product_name,
current_stock=total_stock,
hours_until_depletion=round(hours_until_depletion, 1)
)
except Exception as item_error:
logger.warning(
"Failed to get stock for PO item",
error=str(item_error),
product=item.product_name,
tenant_id=str(tenant_id)
)
# Continue with other items even if one fails
continue
# Build rich reasoning data based on what we found
if critical_products:
# Use detailed reasoning type when we have critical products
return {
"type": "low_stock_detection_detailed",
"parameters": {
"supplier_name": supplier.get('name', 'Supplier'),
"product_names": product_names,
"product_count": len(product_names),
"critical_products": critical_products,
"critical_product_count": len(critical_products),
"min_depletion_hours": round(min_depletion_hours, 1) if min_depletion_hours != float('inf') else 48,
"potential_loss_eur": float(purchase_order.total_amount * 1.5), # Estimated opportunity cost
},
"consequence": {
"type": "stockout_risk",
"severity": "high",
"impact_days": 2
},
"metadata": {
"trigger_source": "manual_with_inventory_check",
"ai_assisted": False,
"enhanced_mode": True
}
}
else:
# Use basic reasoning type when stock levels are not critical
return {
"type": "low_stock_detection",
"parameters": {
"supplier_name": supplier.get('name', 'Supplier'),
"product_names": product_names,
"product_count": len(product_names),
},
"consequence": {
"type": "stockout_risk",
"severity": "medium",
"impact_days": 5
},
"metadata": {
"trigger_source": "manual_with_inventory_check",
"ai_assisted": False,
"enhanced_mode": False
}
}
except Exception as e:
logger.warning(
"Failed to build enhanced reasoning data, using basic fallback",
error=str(e),
tenant_id=str(tenant_id),
po_id=str(purchase_order.id)
)
# Return basic fallback if inventory service is unavailable
return {
"type": "low_stock_detection",
"parameters": {
"supplier_name": supplier.get('name', 'Supplier'),
"product_names": [item.product_name for item in (purchase_order.items if hasattr(purchase_order, 'items') else [])],
"product_count": len(purchase_order.items) if hasattr(purchase_order, 'items') else 0,
},
"consequence": {
"type": "stockout_risk",
"severity": "medium",
"impact_days": 5
},
"metadata": {
"trigger_source": "fallback_basic",
"ai_assisted": False
}
}
async def _get_and_validate_supplier(self, tenant_id: uuid.UUID, supplier_id: uuid.UUID) -> Dict[str, Any]:
"""Get and validate supplier from Suppliers Service"""
try:
supplier = await self.suppliers_client.get_supplier_by_id(str(tenant_id), str(supplier_id))
if not supplier:
raise ValueError("Supplier not found")
if supplier.get('status') != 'active':
raise ValueError("Cannot create orders for inactive suppliers")
return supplier
except Exception as e:
logger.error("Error validating supplier", error=str(e), supplier_id=supplier_id)
raise
async def _get_supplier_cached(self, tenant_id: uuid.UUID, supplier_id: uuid.UUID) -> Optional[Dict[str, Any]]:
"""
Get supplier with request-scoped caching to avoid redundant API calls.
When enriching multiple POs that share suppliers, this cache prevents
duplicate calls to the suppliers service (Fix #11).
Args:
tenant_id: Tenant ID
supplier_id: Supplier ID
Returns:
Supplier data dict or None
"""
cache_key = f"{tenant_id}:{supplier_id}"
if cache_key not in self._supplier_cache:
supplier = await self.suppliers_client.get_supplier_by_id(str(tenant_id), str(supplier_id))
self._supplier_cache[cache_key] = supplier
logger.debug("Supplier cache MISS", tenant_id=str(tenant_id), supplier_id=str(supplier_id))
else:
logger.debug("Supplier cache HIT", tenant_id=str(tenant_id), supplier_id=str(supplier_id))
return self._supplier_cache[cache_key]
async def _enrich_po_with_supplier(self, tenant_id: uuid.UUID, po: PurchaseOrder) -> None:
"""Enrich purchase order with supplier information"""
try:
# Use cached supplier lookup to avoid redundant API calls
supplier = await self._get_supplier_cached(tenant_id, po.supplier_id)
if supplier:
# Set supplier_name as a dynamic attribute on the model instance
po.supplier_name = supplier.get('name', 'Unknown Supplier')
# Create a supplier summary object with the required fields for the frontend
# Using the same structure as the suppliers service SupplierSummary schema
supplier_summary = {
'id': supplier.get('id'),
'name': supplier.get('name', 'Unknown Supplier'),
'supplier_code': supplier.get('supplier_code'),
'email': supplier.get('email'),
'phone': supplier.get('phone'),
'contact_person': supplier.get('contact_person'),
'address_line1': supplier.get('address_line1'),
'city': supplier.get('city'),
'country': supplier.get('country'),
'supplier_type': supplier.get('supplier_type', 'raw_material'),
'status': supplier.get('status', 'active'),
'mobile': supplier.get('mobile'),
'website': supplier.get('website'),
'payment_terms': supplier.get('payment_terms', 'NET_30'),
'standard_lead_time': supplier.get('standard_lead_time', 3),
'quality_rating': supplier.get('quality_rating'),
'delivery_rating': supplier.get('delivery_rating'),
'total_orders': supplier.get('total_orders', 0),
'total_amount': supplier.get('total_amount', 0)
}
# Set the full supplier object as a dynamic attribute
po.supplier = supplier_summary
except Exception as e:
logger.warning("Failed to enrich PO with supplier info", error=str(e), po_id=po.id, supplier_id=po.supplier_id)
po.supplier_name = None
po.supplier = None
def _requires_approval(self, total_amount: Decimal, priority: str) -> bool:
"""Determine if PO requires approval"""
manager_threshold = Decimal(str(getattr(settings, 'MANAGER_APPROVAL_THRESHOLD', 1000)))
return total_amount >= manager_threshold or priority == 'critical'
def _determine_initial_status(self, total_amount: Decimal, requires_approval: bool) -> str:
"""Determine initial PO status"""
auto_approve_threshold = Decimal(str(getattr(settings, 'AUTO_APPROVE_THRESHOLD', 100)))
if requires_approval:
return 'pending_approval'
elif total_amount <= auto_approve_threshold:
return 'approved'
else:
return 'draft'
def _is_valid_status_transition(self, from_status: str, to_status: str) -> bool:
"""Validate status transition"""
valid_transitions = {
'draft': ['pending_approval', 'approved', 'cancelled'],
'pending_approval': ['approved', 'rejected', 'cancelled'],
'approved': ['sent_to_supplier', 'cancelled'],
'sent_to_supplier': ['confirmed', 'cancelled'],
'confirmed': ['in_production', 'cancelled'],
'in_production': ['shipped', 'cancelled'],
'shipped': ['delivered', 'cancelled'],
'delivered': ['completed'],
'rejected': [],
'cancelled': [],
'completed': []
}
return to_status in valid_transitions.get(from_status, [])