347 lines
16 KiB
Python
347 lines
16 KiB
Python
# services/inventory/app/services/transformation_service.py
|
|
"""
|
|
Product Transformation Service - Business Logic Layer
|
|
"""
|
|
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
from uuid import UUID
|
|
from datetime import datetime, timedelta
|
|
import structlog
|
|
import json
|
|
|
|
from app.models.inventory import ProductTransformation, Stock, StockMovement, StockMovementType, ProductionStage
|
|
from app.repositories.transformation_repository import TransformationRepository
|
|
from app.repositories.ingredient_repository import IngredientRepository
|
|
from app.repositories.stock_repository import StockRepository
|
|
from app.repositories.stock_movement_repository import StockMovementRepository
|
|
from app.schemas.inventory import (
|
|
ProductTransformationCreate, ProductTransformationResponse,
|
|
StockCreate, StockMovementCreate,
|
|
IngredientResponse
|
|
)
|
|
from app.core.database import get_db_transaction
|
|
from shared.database.exceptions import DatabaseError
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class TransformationService:
|
|
"""Service layer for product transformation operations"""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def create_transformation(
|
|
self,
|
|
transformation_data: ProductTransformationCreate,
|
|
tenant_id: UUID,
|
|
user_id: Optional[UUID] = None
|
|
) -> ProductTransformationResponse:
|
|
"""Create a product transformation with stock movements"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
transformation_repo = TransformationRepository(db)
|
|
ingredient_repo = IngredientRepository(db)
|
|
stock_repo = StockRepository(db)
|
|
movement_repo = StockMovementRepository(db)
|
|
|
|
# Validate ingredients exist
|
|
source_ingredient = await ingredient_repo.get_by_id(UUID(transformation_data.source_ingredient_id))
|
|
target_ingredient = await ingredient_repo.get_by_id(UUID(transformation_data.target_ingredient_id))
|
|
|
|
if not source_ingredient or source_ingredient.tenant_id != tenant_id:
|
|
raise ValueError("Source ingredient not found")
|
|
if not target_ingredient or target_ingredient.tenant_id != tenant_id:
|
|
raise ValueError("Target ingredient not found")
|
|
|
|
# Reserve source stock using FIFO by default
|
|
source_reservations = await stock_repo.reserve_stock(
|
|
tenant_id,
|
|
UUID(transformation_data.source_ingredient_id),
|
|
transformation_data.source_quantity,
|
|
fifo=True
|
|
)
|
|
|
|
if not source_reservations:
|
|
raise ValueError(f"Insufficient stock available for transformation. Required: {transformation_data.source_quantity}")
|
|
|
|
# Create transformation record
|
|
source_batch_numbers = [res.get('batch_number') for res in source_reservations if res.get('batch_number')]
|
|
transformation = await transformation_repo.create_transformation(
|
|
transformation_data,
|
|
tenant_id,
|
|
user_id,
|
|
source_batch_numbers
|
|
)
|
|
|
|
# Calculate expiration date for target product
|
|
target_expiration_date = self._calculate_target_expiration(
|
|
transformation_data.expiration_calculation_method,
|
|
transformation_data.expiration_days_offset,
|
|
source_reservations
|
|
)
|
|
|
|
# Get current stock level before source consumption
|
|
current_source_stock = await stock_repo.get_total_stock_by_ingredient(tenant_id, UUID(transformation_data.source_ingredient_id))
|
|
running_stock_level = current_source_stock['total_available']
|
|
|
|
# Consume source stock and create movements with progressive tracking
|
|
consumed_items = []
|
|
for reservation in source_reservations:
|
|
stock_id = UUID(reservation['stock_id'])
|
|
reserved_qty = reservation['reserved_quantity']
|
|
|
|
# Calculate before/after for this specific batch
|
|
batch_quantity_before = running_stock_level
|
|
batch_quantity_after = running_stock_level - reserved_qty
|
|
running_stock_level = batch_quantity_after # Update for next iteration
|
|
|
|
# Consume from reserved stock
|
|
await stock_repo.consume_stock(stock_id, reserved_qty, from_reserved=True)
|
|
|
|
# Create movement record for source consumption with progressive tracking
|
|
movement_data = StockMovementCreate(
|
|
ingredient_id=transformation_data.source_ingredient_id,
|
|
stock_id=str(stock_id),
|
|
movement_type=StockMovementType.TRANSFORMATION,
|
|
quantity=reserved_qty,
|
|
reference_number=transformation.transformation_reference,
|
|
notes=f"Transformation: {transformation_data.source_stage.value} → {transformation_data.target_stage.value}"
|
|
)
|
|
await movement_repo.create_movement(movement_data, tenant_id, user_id, batch_quantity_before, batch_quantity_after)
|
|
|
|
consumed_items.append({
|
|
'stock_id': str(stock_id),
|
|
'quantity_consumed': reserved_qty,
|
|
'batch_number': reservation.get('batch_number')
|
|
})
|
|
|
|
# Create target stock entry
|
|
target_stock_data = StockCreate(
|
|
ingredient_id=transformation_data.target_ingredient_id,
|
|
production_stage=transformation_data.target_stage,
|
|
transformation_reference=transformation.transformation_reference,
|
|
current_quantity=transformation_data.target_quantity,
|
|
batch_number=transformation_data.target_batch_number or f"TRANS-{transformation.transformation_reference}",
|
|
expiration_date=target_expiration_date['expiration_date'],
|
|
original_expiration_date=target_expiration_date.get('original_expiration_date'),
|
|
transformation_date=transformation.transformation_date,
|
|
final_expiration_date=target_expiration_date['expiration_date'],
|
|
unit_cost=self._calculate_target_unit_cost(consumed_items, transformation_data.target_quantity),
|
|
quality_status="good"
|
|
)
|
|
|
|
target_stock = await stock_repo.create_stock_entry(target_stock_data, tenant_id)
|
|
|
|
# Get current stock level before target addition
|
|
current_target_stock = await stock_repo.get_total_stock_by_ingredient(tenant_id, UUID(transformation_data.target_ingredient_id))
|
|
target_quantity_before = current_target_stock['total_available']
|
|
target_quantity_after = target_quantity_before + transformation_data.target_quantity
|
|
|
|
# Create target stock movement
|
|
target_movement_data = StockMovementCreate(
|
|
ingredient_id=transformation_data.target_ingredient_id,
|
|
stock_id=str(target_stock.id),
|
|
movement_type=StockMovementType.TRANSFORMATION,
|
|
quantity=transformation_data.target_quantity,
|
|
reference_number=transformation.transformation_reference,
|
|
notes=f"Transformation result: {transformation_data.source_stage.value} → {transformation_data.target_stage.value}"
|
|
)
|
|
await movement_repo.create_movement(target_movement_data, tenant_id, user_id, target_quantity_before, target_quantity_after)
|
|
|
|
# Convert to response schema
|
|
response = ProductTransformationResponse(**transformation.to_dict())
|
|
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
|
|
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
|
|
|
|
logger.info(
|
|
"Transformation completed successfully",
|
|
transformation_id=transformation.id,
|
|
reference=transformation.transformation_reference,
|
|
source_quantity=transformation_data.source_quantity,
|
|
target_quantity=transformation_data.target_quantity
|
|
)
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to create transformation", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def get_transformation(
|
|
self,
|
|
transformation_id: UUID,
|
|
tenant_id: UUID
|
|
) -> Optional[ProductTransformationResponse]:
|
|
"""Get transformation by ID"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
transformation_repo = TransformationRepository(db)
|
|
ingredient_repo = IngredientRepository(db)
|
|
|
|
transformation = await transformation_repo.get_by_id(transformation_id)
|
|
if not transformation or transformation.tenant_id != tenant_id:
|
|
return None
|
|
|
|
# Get related ingredients
|
|
source_ingredient = await ingredient_repo.get_by_id(transformation.source_ingredient_id)
|
|
target_ingredient = await ingredient_repo.get_by_id(transformation.target_ingredient_id)
|
|
|
|
response = ProductTransformationResponse(**transformation.to_dict())
|
|
if source_ingredient:
|
|
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
|
|
if target_ingredient:
|
|
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get transformation", error=str(e), transformation_id=transformation_id)
|
|
raise
|
|
|
|
async def get_transformations(
|
|
self,
|
|
tenant_id: UUID,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
ingredient_id: Optional[UUID] = None,
|
|
source_stage: Optional[ProductionStage] = None,
|
|
target_stage: Optional[ProductionStage] = None,
|
|
days_back: Optional[int] = None
|
|
) -> List[ProductTransformationResponse]:
|
|
"""Get transformations with filtering"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
transformation_repo = TransformationRepository(db)
|
|
ingredient_repo = IngredientRepository(db)
|
|
|
|
if ingredient_id:
|
|
# Get transformations where ingredient is either source or target
|
|
source_transformations = await transformation_repo.get_transformations_by_ingredient(
|
|
tenant_id, ingredient_id, is_source=True, skip=0, limit=limit//2, days_back=days_back
|
|
)
|
|
target_transformations = await transformation_repo.get_transformations_by_ingredient(
|
|
tenant_id, ingredient_id, is_source=False, skip=0, limit=limit//2, days_back=days_back
|
|
)
|
|
transformations = source_transformations + target_transformations
|
|
# Remove duplicates and sort by date
|
|
unique_transformations = {t.id: t for t in transformations}.values()
|
|
transformations = sorted(unique_transformations, key=lambda x: x.transformation_date, reverse=True)
|
|
transformations = transformations[skip:skip+limit]
|
|
else:
|
|
transformations = await transformation_repo.get_transformations_by_stage(
|
|
tenant_id, source_stage, target_stage, skip, limit, days_back
|
|
)
|
|
|
|
responses = []
|
|
for transformation in transformations:
|
|
# Get related ingredients
|
|
source_ingredient = await ingredient_repo.get_by_id(transformation.source_ingredient_id)
|
|
target_ingredient = await ingredient_repo.get_by_id(transformation.target_ingredient_id)
|
|
|
|
response = ProductTransformationResponse(**transformation.to_dict())
|
|
if source_ingredient:
|
|
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
|
|
if target_ingredient:
|
|
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
|
|
|
|
responses.append(response)
|
|
|
|
return responses
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get transformations", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
def _calculate_target_expiration(
|
|
self,
|
|
calculation_method: str,
|
|
expiration_days_offset: Optional[int],
|
|
source_reservations: List[Dict[str, Any]]
|
|
) -> Dict[str, Optional[datetime]]:
|
|
"""Calculate expiration date for target product"""
|
|
current_time = datetime.now()
|
|
|
|
if calculation_method == "days_from_transformation":
|
|
# Calculate expiration based on transformation date + offset
|
|
if expiration_days_offset:
|
|
expiration_date = current_time + timedelta(days=expiration_days_offset)
|
|
else:
|
|
expiration_date = current_time + timedelta(days=1) # Default 1 day for fresh baked goods
|
|
|
|
# Use earliest source expiration as original
|
|
original_expiration = None
|
|
if source_reservations:
|
|
source_expirations = [res.get('expiration_date') for res in source_reservations if res.get('expiration_date')]
|
|
if source_expirations:
|
|
original_expiration = min(source_expirations)
|
|
|
|
return {
|
|
'expiration_date': expiration_date,
|
|
'original_expiration_date': original_expiration
|
|
}
|
|
|
|
elif calculation_method == "preserve_original":
|
|
# Use the earliest expiration date from source stock
|
|
if source_reservations:
|
|
source_expirations = [res.get('expiration_date') for res in source_reservations if res.get('expiration_date')]
|
|
if source_expirations:
|
|
expiration_date = min(source_expirations)
|
|
return {
|
|
'expiration_date': expiration_date,
|
|
'original_expiration_date': expiration_date
|
|
}
|
|
|
|
# Fallback to default
|
|
return {
|
|
'expiration_date': current_time + timedelta(days=7),
|
|
'original_expiration_date': None
|
|
}
|
|
|
|
else:
|
|
# Default fallback
|
|
return {
|
|
'expiration_date': current_time + timedelta(days=1),
|
|
'original_expiration_date': None
|
|
}
|
|
|
|
def _calculate_target_unit_cost(
|
|
self,
|
|
consumed_items: List[Dict[str, Any]],
|
|
target_quantity: float
|
|
) -> Optional[float]:
|
|
"""Calculate unit cost for target product based on consumed items"""
|
|
# This is a simplified calculation - in reality you'd want to consider
|
|
# additional costs like labor, energy, etc.
|
|
total_source_cost = 0.0
|
|
total_source_quantity = 0.0
|
|
|
|
for item in consumed_items:
|
|
quantity = item.get('quantity_consumed', 0)
|
|
# Note: In a real implementation, you'd fetch the unit cost from the stock items
|
|
# For now, we'll use a placeholder
|
|
total_source_quantity += quantity
|
|
|
|
if total_source_quantity > 0 and target_quantity > 0:
|
|
# Simple cost transfer based on quantity ratio
|
|
return total_source_cost / target_quantity
|
|
|
|
return None
|
|
|
|
async def get_transformation_summary(
|
|
self,
|
|
tenant_id: UUID,
|
|
days_back: int = 30
|
|
) -> Dict[str, Any]:
|
|
"""Get transformation summary for dashboard"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
transformation_repo = TransformationRepository(db)
|
|
|
|
summary = await transformation_repo.get_transformation_summary_by_period(tenant_id, days_back)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get transformation summary", error=str(e), tenant_id=tenant_id)
|
|
raise
|