Improve the inventory page

This commit is contained in:
Urtzi Alfaro
2025-09-17 16:06:30 +02:00
parent 7aa26d51d3
commit dcb3ce441b
39 changed files with 5852 additions and 1762 deletions

View File

@@ -11,9 +11,10 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.services.inventory_service import InventoryService
from app.schemas.inventory import (
IngredientCreate,
IngredientUpdate,
IngredientCreate,
IngredientUpdate,
IngredientResponse,
StockResponse,
InventoryFilter,
PaginatedResponse
)
@@ -171,38 +172,52 @@ async def list_ingredients(
@router.delete("/tenants/{tenant_id}/ingredients/{ingredient_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_ingredient(
async def soft_delete_ingredient(
ingredient_id: UUID,
tenant_id: UUID = Path(..., description="Tenant ID"),
db: AsyncSession = Depends(get_db)
):
"""Soft delete ingredient (mark as inactive)"""
try:
service = InventoryService()
ingredient = await service.update_ingredient(
ingredient_id,
{"is_active": False},
tenant_id
)
if not ingredient:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Ingredient not found"
)
result = await service.soft_delete_ingredient(ingredient_id, tenant_id)
return None
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete ingredient"
detail="Failed to soft delete ingredient"
)
@router.get("/tenants/{tenant_id}/ingredients/{ingredient_id}/stock", response_model=List[dict])
@router.delete("/tenants/{tenant_id}/ingredients/{ingredient_id}/hard")
async def hard_delete_ingredient(
ingredient_id: UUID,
tenant_id: UUID = Path(..., description="Tenant ID"),
db: AsyncSession = Depends(get_db)
):
"""Hard delete ingredient and all associated data (stock, movements, etc.)"""
try:
service = InventoryService()
deletion_summary = await service.hard_delete_ingredient(ingredient_id, tenant_id)
return deletion_summary
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to hard delete ingredient"
)
@router.get("/tenants/{tenant_id}/ingredients/{ingredient_id}/stock", response_model=List[StockResponse])
async def get_ingredient_stock(
ingredient_id: UUID,
tenant_id: UUID = Path(..., description="Tenant ID"),

View File

@@ -0,0 +1,197 @@
# services/inventory/app/api/transformations.py
"""
API endpoints for product transformations
"""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Path, status
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from app.core.database import get_db
from app.services.transformation_service import TransformationService
from app.schemas.inventory import (
ProductTransformationCreate,
ProductTransformationResponse
)
from app.models.inventory import ProductionStage
from shared.auth.decorators import get_current_user_dep
logger = structlog.get_logger()
router = APIRouter(tags=["transformations"])
# Helper function to extract user ID from user object
def get_current_user_id(current_user: dict = Depends(get_current_user_dep)) -> UUID:
"""Extract user ID from current user context"""
user_id = current_user.get('user_id')
if not user_id:
# Handle service tokens that don't have UUID user_ids
if current_user.get('type') == 'service':
return None
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User ID not found in context"
)
try:
return UUID(user_id)
except (ValueError, TypeError):
return None
@router.post("/tenants/{tenant_id}/transformations", response_model=ProductTransformationResponse)
async def create_transformation(
transformation_data: ProductTransformationCreate,
tenant_id: UUID = Path(..., description="Tenant ID"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""Create a new product transformation (e.g., par-baked to fully baked)"""
try:
# Extract user ID - handle service tokens
user_id = get_current_user_id(current_user)
service = TransformationService()
transformation = await service.create_transformation(transformation_data, tenant_id, user_id)
return transformation
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error("Failed to create transformation", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create transformation"
)
@router.get("/tenants/{tenant_id}/transformations", response_model=List[ProductTransformationResponse])
async def get_transformations(
tenant_id: UUID = Path(..., description="Tenant ID"),
skip: int = Query(0, ge=0, description="Number of records to skip"),
limit: int = Query(100, ge=1, le=1000, description="Number of records to return"),
ingredient_id: Optional[UUID] = Query(None, description="Filter by ingredient (source or target)"),
source_stage: Optional[ProductionStage] = Query(None, description="Filter by source production stage"),
target_stage: Optional[ProductionStage] = Query(None, description="Filter by target production stage"),
days_back: Optional[int] = Query(None, ge=1, le=365, description="Filter by days back from today"),
db: AsyncSession = Depends(get_db)
):
"""Get product transformations with filtering"""
try:
service = TransformationService()
transformations = await service.get_transformations(
tenant_id, skip, limit, ingredient_id, source_stage, target_stage, days_back
)
return transformations
except Exception as e:
logger.error("Failed to get transformations", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get transformations"
)
@router.get("/tenants/{tenant_id}/transformations/{transformation_id}", response_model=ProductTransformationResponse)
async def get_transformation(
transformation_id: UUID = Path(..., description="Transformation ID"),
tenant_id: UUID = Path(..., description="Tenant ID"),
db: AsyncSession = Depends(get_db)
):
"""Get specific transformation by ID"""
try:
service = TransformationService()
transformation = await service.get_transformation(transformation_id, tenant_id)
if not transformation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Transformation not found"
)
return transformation
except HTTPException:
raise
except Exception as e:
logger.error("Failed to get transformation", error=str(e), transformation_id=transformation_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get transformation"
)
@router.get("/tenants/{tenant_id}/transformations/summary", response_model=dict)
async def get_transformation_summary(
tenant_id: UUID = Path(..., description="Tenant ID"),
days_back: int = Query(30, ge=1, le=365, description="Days back for summary"),
db: AsyncSession = Depends(get_db)
):
"""Get transformation summary for dashboard"""
try:
service = TransformationService()
summary = await service.get_transformation_summary(tenant_id, days_back)
return summary
except Exception as e:
logger.error("Failed to get transformation summary", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get transformation summary"
)
@router.post("/tenants/{tenant_id}/transformations/par-bake-to-fresh")
async def create_par_bake_transformation(
source_ingredient_id: UUID = Query(..., description="Par-baked ingredient ID"),
target_ingredient_id: UUID = Query(..., description="Fresh baked ingredient ID"),
quantity: float = Query(..., gt=0, description="Quantity to transform"),
target_batch_number: Optional[str] = Query(None, description="Target batch number"),
expiration_hours: int = Query(24, ge=1, le=72, description="Hours until expiration after baking"),
notes: Optional[str] = Query(None, description="Process notes"),
tenant_id: UUID = Path(..., description="Tenant ID"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""Convenience endpoint for par-baked to fresh transformation"""
try:
# Extract user ID - handle service tokens
user_id = get_current_user_id(current_user)
# Create transformation data for par-baked to fully baked
transformation_data = ProductTransformationCreate(
source_ingredient_id=str(source_ingredient_id),
target_ingredient_id=str(target_ingredient_id),
source_stage=ProductionStage.PAR_BAKED,
target_stage=ProductionStage.FULLY_BAKED,
source_quantity=quantity,
target_quantity=quantity, # Assume 1:1 ratio for par-baked goods
expiration_calculation_method="days_from_transformation",
expiration_days_offset=max(1, expiration_hours // 24), # Convert hours to days, minimum 1 day
process_notes=notes,
target_batch_number=target_batch_number
)
service = TransformationService()
transformation = await service.create_transformation(transformation_data, tenant_id, user_id)
return {
"transformation_id": transformation.id,
"transformation_reference": transformation.transformation_reference,
"source_quantity": transformation.source_quantity,
"target_quantity": transformation.target_quantity,
"expiration_date": transformation.transformation_date,
"message": f"Successfully transformed {quantity} units from par-baked to fresh baked"
}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error("Failed to create par-bake transformation", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create par-bake transformation"
)

View File

@@ -13,7 +13,7 @@ import structlog
# Import core modules
from app.core.config import settings
from app.core.database import init_db, close_db, health_check as db_health_check
from app.api import ingredients, stock, classification
from app.api import ingredients, stock, classification, transformations
from app.services.inventory_alert_service import InventoryAlertService
from shared.monitoring.metrics import setup_metrics_early
# Auth decorators are used in endpoints, no global setup needed
@@ -128,6 +128,7 @@ async def general_exception_handler(request: Request, exc: Exception):
# Include routers
app.include_router(ingredients.router, prefix=settings.API_V1_STR)
app.include_router(stock.router, prefix=settings.API_V1_STR)
app.include_router(transformations.router, prefix=settings.API_V1_STR)
app.include_router(classification.router, prefix=settings.API_V1_STR)
# Include enhanced routers

View File

@@ -64,6 +64,15 @@ class ProductType(enum.Enum):
FINISHED_PRODUCT = "finished_product" # Ready-to-sell items (bread, croissants, etc.)
class ProductionStage(enum.Enum):
"""Production stages for bakery products"""
RAW_INGREDIENT = "raw_ingredient" # Basic ingredients (flour, yeast)
PAR_BAKED = "par_baked" # Pre-baked items needing final baking
FULLY_BAKED = "fully_baked" # Completed products ready for sale
PREPARED_DOUGH = "prepared_dough" # Prepared but unbaked dough
FROZEN_PRODUCT = "frozen_product" # Frozen intermediate products
class StockMovementType(enum.Enum):
"""Types of inventory movements"""
PURCHASE = "purchase"
@@ -73,6 +82,7 @@ class StockMovementType(enum.Enum):
TRANSFER = "transfer"
RETURN = "return"
INITIAL_STOCK = "initial_stock"
TRANSFORMATION = "transformation" # Converting between production stages
class Ingredient(Base):
@@ -227,16 +237,20 @@ class Ingredient(Base):
class Stock(Base):
"""Current stock levels and batch tracking"""
__tablename__ = "stock"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
ingredient_id = Column(UUID(as_uuid=True), ForeignKey('ingredients.id'), nullable=False, index=True)
# Stock identification
batch_number = Column(String(100), nullable=True, index=True)
lot_number = Column(String(100), nullable=True, index=True)
supplier_batch_ref = Column(String(100), nullable=True)
# Production stage tracking
production_stage = Column(String(20), nullable=False, default='raw_ingredient', index=True)
transformation_reference = Column(String(100), nullable=True, index=True) # Links related transformations
# Quantities
current_quantity = Column(Float, nullable=False, default=0.0)
reserved_quantity = Column(Float, nullable=False, default=0.0) # Reserved for production
@@ -246,6 +260,11 @@ class Stock(Base):
received_date = Column(DateTime(timezone=True), nullable=True)
expiration_date = Column(DateTime(timezone=True), nullable=True, index=True)
best_before_date = Column(DateTime(timezone=True), nullable=True)
# Stage-specific expiration tracking
original_expiration_date = Column(DateTime(timezone=True), nullable=True) # Original batch expiration (for par-baked)
transformation_date = Column(DateTime(timezone=True), nullable=True) # When product was transformed
final_expiration_date = Column(DateTime(timezone=True), nullable=True) # Final product expiration after transformation
# Cost tracking
unit_cost = Column(Numeric(10, 2), nullable=True)
@@ -276,6 +295,9 @@ class Stock(Base):
Index('idx_stock_batch', 'tenant_id', 'batch_number'),
Index('idx_stock_low_levels', 'tenant_id', 'current_quantity', 'is_available'),
Index('idx_stock_quality', 'tenant_id', 'quality_status', 'is_available'),
Index('idx_stock_production_stage', 'tenant_id', 'production_stage', 'is_available'),
Index('idx_stock_transformation', 'tenant_id', 'transformation_reference'),
Index('idx_stock_final_expiration', 'tenant_id', 'final_expiration_date', 'is_available'),
)
def to_dict(self) -> Dict[str, Any]:
@@ -287,12 +309,17 @@ class Stock(Base):
'batch_number': self.batch_number,
'lot_number': self.lot_number,
'supplier_batch_ref': self.supplier_batch_ref,
'production_stage': self.production_stage if self.production_stage else None,
'transformation_reference': self.transformation_reference,
'current_quantity': self.current_quantity,
'reserved_quantity': self.reserved_quantity,
'available_quantity': self.available_quantity,
'received_date': self.received_date.isoformat() if self.received_date else None,
'expiration_date': self.expiration_date.isoformat() if self.expiration_date else None,
'best_before_date': self.best_before_date.isoformat() if self.best_before_date else None,
'original_expiration_date': self.original_expiration_date.isoformat() if self.original_expiration_date else None,
'transformation_date': self.transformation_date.isoformat() if self.transformation_date else None,
'final_expiration_date': self.final_expiration_date.isoformat() if self.final_expiration_date else None,
'unit_cost': float(self.unit_cost) if self.unit_cost else None,
'total_cost': float(self.total_cost) if self.total_cost else None,
'storage_location': self.storage_location,
@@ -375,6 +402,83 @@ class StockMovement(Base):
}
class ProductTransformation(Base):
"""Track product transformations (e.g., par-baked to fully baked)"""
__tablename__ = "product_transformations"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
# Transformation details
transformation_reference = Column(String(100), nullable=False, index=True)
source_ingredient_id = Column(UUID(as_uuid=True), ForeignKey('ingredients.id'), nullable=False)
target_ingredient_id = Column(UUID(as_uuid=True), ForeignKey('ingredients.id'), nullable=False)
# Stage transformation
source_stage = Column(String(20), nullable=False)
target_stage = Column(String(20), nullable=False)
# Quantities and conversion
source_quantity = Column(Float, nullable=False) # Input quantity
target_quantity = Column(Float, nullable=False) # Output quantity
conversion_ratio = Column(Float, nullable=False, default=1.0) # target/source ratio
# Expiration logic
expiration_calculation_method = Column(String(50), nullable=False, default="days_from_transformation") # days_from_transformation, preserve_original
expiration_days_offset = Column(Integer, nullable=True) # Days from transformation date
# Process tracking
transformation_date = Column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc))
process_notes = Column(Text, nullable=True)
performed_by = Column(UUID(as_uuid=True), nullable=True)
# Batch tracking
source_batch_numbers = Column(Text, nullable=True) # JSON array of source batch numbers
target_batch_number = Column(String(100), nullable=True)
# Status
is_completed = Column(Boolean, default=True)
is_reversed = Column(Boolean, default=False)
# Audit fields
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
created_by = Column(UUID(as_uuid=True), nullable=True)
__table_args__ = (
Index('idx_transformations_tenant_date', 'tenant_id', 'transformation_date'),
Index('idx_transformations_reference', 'transformation_reference'),
Index('idx_transformations_source', 'tenant_id', 'source_ingredient_id'),
Index('idx_transformations_target', 'tenant_id', 'target_ingredient_id'),
Index('idx_transformations_stages', 'source_stage', 'target_stage'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary for API responses"""
return {
'id': str(self.id),
'tenant_id': str(self.tenant_id),
'transformation_reference': self.transformation_reference,
'source_ingredient_id': str(self.source_ingredient_id),
'target_ingredient_id': str(self.target_ingredient_id),
'source_stage': self.source_stage if self.source_stage else None,
'target_stage': self.target_stage if self.target_stage else None,
'source_quantity': self.source_quantity,
'target_quantity': self.target_quantity,
'conversion_ratio': self.conversion_ratio,
'expiration_calculation_method': self.expiration_calculation_method,
'expiration_days_offset': self.expiration_days_offset,
'transformation_date': self.transformation_date.isoformat() if self.transformation_date else None,
'process_notes': self.process_notes,
'performed_by': str(self.performed_by) if self.performed_by else None,
'source_batch_numbers': self.source_batch_numbers,
'target_batch_number': self.target_batch_number,
'is_completed': self.is_completed,
'is_reversed': self.is_reversed,
'created_at': self.created_at.isoformat() if self.created_at else None,
'created_by': str(self.created_by) if self.created_by else None,
}
class StockAlert(Base):
"""Automated stock alerts for low stock, expiration, etc."""
__tablename__ = "stock_alerts"

View File

@@ -418,4 +418,28 @@ class IngredientRepository(BaseRepository[Ingredient, IngredientCreate, Ingredie
except Exception as e:
logger.error("Failed to get ingredients by category", error=str(e), category=category, tenant_id=tenant_id)
raise
async def delete_by_id(self, ingredient_id: UUID, tenant_id: UUID) -> bool:
"""Hard delete an ingredient by ID"""
try:
from sqlalchemy import delete
# Delete the ingredient
stmt = delete(self.model).where(
and_(
self.model.id == ingredient_id,
self.model.tenant_id == tenant_id
)
)
result = await self.session.execute(stmt)
await self.session.commit()
# Return True if a row was deleted
return result.rowcount > 0
except Exception as e:
await self.session.rollback()
logger.error("Failed to hard delete ingredient", error=str(e), ingredient_id=ingredient_id, tenant_id=tenant_id)
raise

View File

@@ -383,4 +383,38 @@ class StockMovementRepository(BaseRepository[StockMovement, StockMovementCreate,
except Exception as e:
logger.error("Failed to calculate ingredient usage", error=str(e), ingredient_id=ingredient_id)
raise
async def delete_by_ingredient(self, ingredient_id: UUID, tenant_id: UUID) -> int:
"""Delete all stock movements for a specific ingredient"""
try:
from sqlalchemy import delete
from app.models.inventory import StockMovement
stmt = delete(StockMovement).where(
and_(
StockMovement.ingredient_id == ingredient_id,
StockMovement.tenant_id == tenant_id
)
)
result = await self.session.execute(stmt)
deleted_count = result.rowcount
logger.info(
"Deleted stock movements for ingredient",
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id),
deleted_count=deleted_count
)
return deleted_count
except Exception as e:
logger.error(
"Failed to delete stock movements for ingredient",
error=str(e),
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id)
)
raise

View File

@@ -109,14 +109,16 @@ class StockRepository(BaseRepository[Stock, StockCreate, StockUpdate]):
raise
async def get_expiring_stock(
self,
tenant_id: UUID,
self,
tenant_id: UUID,
days_ahead: int = 7
) -> List[Tuple[Stock, Ingredient]]:
"""Get stock items expiring within specified days"""
"""Get stock items expiring within specified days using state-dependent expiration logic"""
try:
expiry_date = datetime.now() + timedelta(days=days_ahead)
# Use final_expiration_date if available (for transformed products),
# otherwise use regular expiration_date
result = await self.session.execute(
select(Stock, Ingredient)
.join(Ingredient, Stock.ingredient_id == Ingredient.id)
@@ -124,24 +126,39 @@ class StockRepository(BaseRepository[Stock, StockCreate, StockUpdate]):
and_(
Stock.tenant_id == tenant_id,
Stock.is_available == True,
Stock.expiration_date.isnot(None),
Stock.expiration_date <= expiry_date
or_(
and_(
Stock.final_expiration_date.isnot(None),
Stock.final_expiration_date <= expiry_date
),
and_(
Stock.final_expiration_date.is_(None),
Stock.expiration_date.isnot(None),
Stock.expiration_date <= expiry_date
)
)
)
)
.order_by(
asc(
func.coalesce(Stock.final_expiration_date, Stock.expiration_date)
)
)
.order_by(asc(Stock.expiration_date))
)
return result.all()
except Exception as e:
logger.error("Failed to get expiring stock", error=str(e), tenant_id=tenant_id)
raise
async def get_expired_stock(self, tenant_id: UUID) -> List[Tuple[Stock, Ingredient]]:
"""Get stock items that have expired"""
"""Get stock items that have expired using state-dependent expiration logic"""
try:
current_date = datetime.now()
# Use final_expiration_date if available (for transformed products),
# otherwise use regular expiration_date
result = await self.session.execute(
select(Stock, Ingredient)
.join(Ingredient, Stock.ingredient_id == Ingredient.id)
@@ -149,31 +166,45 @@ class StockRepository(BaseRepository[Stock, StockCreate, StockUpdate]):
and_(
Stock.tenant_id == tenant_id,
Stock.is_available == True,
Stock.expiration_date.isnot(None),
Stock.expiration_date < current_date
or_(
and_(
Stock.final_expiration_date.isnot(None),
Stock.final_expiration_date < current_date
),
and_(
Stock.final_expiration_date.is_(None),
Stock.expiration_date.isnot(None),
Stock.expiration_date < current_date
)
)
)
)
.order_by(
desc(
func.coalesce(Stock.final_expiration_date, Stock.expiration_date)
)
)
.order_by(desc(Stock.expiration_date))
)
return result.all()
except Exception as e:
logger.error("Failed to get expired stock", error=str(e), tenant_id=tenant_id)
raise
async def reserve_stock(
self,
tenant_id: UUID,
ingredient_id: UUID,
self,
tenant_id: UUID,
ingredient_id: UUID,
quantity: float,
fifo: bool = True
) -> List[Dict[str, Any]]:
"""Reserve stock using FIFO/LIFO method"""
"""Reserve stock using FIFO/LIFO method with state-dependent expiration"""
try:
# Get available stock ordered by expiration date
order_clause = asc(Stock.expiration_date) if fifo else desc(Stock.expiration_date)
# Order by appropriate expiration date based on transformation status
effective_expiration = func.coalesce(Stock.final_expiration_date, Stock.expiration_date)
order_clause = asc(effective_expiration) if fifo else desc(effective_expiration)
result = await self.session.execute(
select(Stock).where(
and_(
@@ -364,27 +395,133 @@ class StockRepository(BaseRepository[Stock, StockCreate, StockUpdate]):
raise
async def mark_expired_stock(self, tenant_id: UUID) -> int:
"""Mark expired stock items as expired"""
"""Mark expired stock items as expired using state-dependent expiration logic"""
try:
current_date = datetime.now()
# Mark items as expired based on final_expiration_date or expiration_date
result = await self.session.execute(
update(Stock)
.where(
and_(
Stock.tenant_id == tenant_id,
Stock.expiration_date < current_date,
Stock.is_expired == False
Stock.is_expired == False,
or_(
and_(
Stock.final_expiration_date.isnot(None),
Stock.final_expiration_date < current_date
),
and_(
Stock.final_expiration_date.is_(None),
Stock.expiration_date.isnot(None),
Stock.expiration_date < current_date
)
)
)
)
.values(is_expired=True, quality_status="expired")
)
expired_count = result.rowcount
logger.info(f"Marked {expired_count} stock items as expired", tenant_id=tenant_id)
logger.info(f"Marked {expired_count} stock items as expired using state-dependent logic", tenant_id=tenant_id)
return expired_count
except Exception as e:
logger.error("Failed to mark expired stock", error=str(e), tenant_id=tenant_id)
raise
async def get_stock_by_production_stage(
self,
tenant_id: UUID,
production_stage: 'ProductionStage',
ingredient_id: Optional[UUID] = None
) -> List['Stock']:
"""Get stock items by production stage"""
try:
conditions = [
Stock.tenant_id == tenant_id,
Stock.production_stage == production_stage,
Stock.is_available == True
]
if ingredient_id:
conditions.append(Stock.ingredient_id == ingredient_id)
result = await self.session.execute(
select(Stock)
.where(and_(*conditions))
.order_by(asc(func.coalesce(Stock.final_expiration_date, Stock.expiration_date)))
)
return result.scalars().all()
except Exception as e:
logger.error("Failed to get stock by production stage", error=str(e), production_stage=production_stage)
raise
async def get_stock_entries(
self,
tenant_id: UUID,
skip: int = 0,
limit: int = 100,
ingredient_id: Optional[UUID] = None,
available_only: bool = True
) -> List[Stock]:
"""Get stock entries with filtering and pagination"""
try:
conditions = [Stock.tenant_id == tenant_id]
if available_only:
conditions.append(Stock.is_available == True)
if ingredient_id:
conditions.append(Stock.ingredient_id == ingredient_id)
query = (
select(Stock)
.where(and_(*conditions))
.order_by(desc(Stock.created_at))
.offset(skip)
.limit(limit)
)
result = await self.session.execute(query)
return result.scalars().all()
except Exception as e:
logger.error("Failed to get stock entries", error=str(e), tenant_id=tenant_id)
raise
async def delete_by_ingredient(self, ingredient_id: UUID, tenant_id: UUID) -> int:
"""Delete all stock entries for a specific ingredient"""
try:
from sqlalchemy import delete
stmt = delete(Stock).where(
and_(
Stock.ingredient_id == ingredient_id,
Stock.tenant_id == tenant_id
)
)
result = await self.session.execute(stmt)
deleted_count = result.rowcount
logger.info(
"Deleted stock entries for ingredient",
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id),
deleted_count=deleted_count
)
return deleted_count
except Exception as e:
logger.error(
"Failed to delete stock entries for ingredient",
error=str(e),
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id)
)
raise

View File

@@ -0,0 +1,257 @@
# services/inventory/app/repositories/transformation_repository.py
"""
Product Transformation Repository using Repository Pattern
"""
from typing import List, Optional, Dict, Any
from uuid import UUID
from datetime import datetime, timedelta
from sqlalchemy import select, func, and_, or_, desc, asc
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
import json
import uuid
from app.models.inventory import ProductTransformation, Ingredient, ProductionStage
from app.schemas.inventory import ProductTransformationCreate
from shared.database.repository import BaseRepository
logger = structlog.get_logger()
class TransformationRepository(BaseRepository[ProductTransformation, ProductTransformationCreate, dict]):
"""Repository for product transformation operations"""
def __init__(self, session: AsyncSession):
super().__init__(ProductTransformation, session)
async def create_transformation(
self,
transformation_data: ProductTransformationCreate,
tenant_id: UUID,
created_by: Optional[UUID] = None,
source_batch_numbers: Optional[List[str]] = None
) -> ProductTransformation:
"""Create a new product transformation record"""
try:
# Generate transformation reference
transformation_ref = f"TRANS-{datetime.now().strftime('%Y%m%d')}-{str(uuid.uuid4())[:8].upper()}"
# Prepare data
create_data = transformation_data.model_dump()
create_data['tenant_id'] = tenant_id
create_data['created_by'] = created_by
create_data['transformation_reference'] = transformation_ref
# Calculate conversion ratio if not provided
if not create_data.get('conversion_ratio'):
create_data['conversion_ratio'] = create_data['target_quantity'] / create_data['source_quantity']
# Store source batch numbers as JSON
if source_batch_numbers:
create_data['source_batch_numbers'] = json.dumps(source_batch_numbers)
# Create record
record = await self.create(create_data)
logger.info(
"Created product transformation",
transformation_id=record.id,
reference=record.transformation_reference,
source_stage=record.source_stage.value,
target_stage=record.target_stage.value,
source_quantity=record.source_quantity,
target_quantity=record.target_quantity,
tenant_id=tenant_id
)
return record
except Exception as e:
logger.error("Failed to create transformation", error=str(e), tenant_id=tenant_id)
raise
async def get_transformations_by_ingredient(
self,
tenant_id: UUID,
ingredient_id: UUID,
is_source: bool = True,
skip: int = 0,
limit: int = 100,
days_back: Optional[int] = None
) -> List[ProductTransformation]:
"""Get transformations for a specific ingredient"""
try:
if is_source:
query = select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.source_ingredient_id == ingredient_id
)
)
else:
query = select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.target_ingredient_id == ingredient_id
)
)
# Filter by date range if specified
if days_back:
start_date = datetime.now() - timedelta(days=days_back)
query = query.where(self.model.transformation_date >= start_date)
query = query.order_by(desc(self.model.transformation_date)).offset(skip).limit(limit)
result = await self.session.execute(query)
return result.scalars().all()
except Exception as e:
logger.error("Failed to get transformations by ingredient", error=str(e), ingredient_id=ingredient_id)
raise
async def get_transformations_by_stage(
self,
tenant_id: UUID,
source_stage: Optional[ProductionStage] = None,
target_stage: Optional[ProductionStage] = None,
skip: int = 0,
limit: int = 100,
days_back: Optional[int] = None
) -> List[ProductTransformation]:
"""Get transformations by production stage"""
try:
conditions = [self.model.tenant_id == tenant_id]
if source_stage:
conditions.append(self.model.source_stage == source_stage)
if target_stage:
conditions.append(self.model.target_stage == target_stage)
query = select(self.model).where(and_(*conditions))
# Filter by date range if specified
if days_back:
start_date = datetime.now() - timedelta(days=days_back)
query = query.where(self.model.transformation_date >= start_date)
query = query.order_by(desc(self.model.transformation_date)).offset(skip).limit(limit)
result = await self.session.execute(query)
return result.scalars().all()
except Exception as e:
logger.error("Failed to get transformations by stage", error=str(e))
raise
async def get_transformation_by_reference(
self,
tenant_id: UUID,
transformation_reference: str
) -> Optional[ProductTransformation]:
"""Get transformation by reference number"""
try:
result = await self.session.execute(
select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.transformation_reference == transformation_reference
)
)
)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Failed to get transformation by reference", error=str(e), reference=transformation_reference)
raise
async def get_transformation_summary_by_period(
self,
tenant_id: UUID,
days_back: int = 30
) -> Dict[str, Any]:
"""Get transformation summary for specified period"""
try:
start_date = datetime.now() - timedelta(days=days_back)
# Get transformation counts by stage combination
result = await self.session.execute(
select(
self.model.source_stage,
self.model.target_stage,
func.count(self.model.id).label('count'),
func.coalesce(func.sum(self.model.source_quantity), 0).label('total_source_quantity'),
func.coalesce(func.sum(self.model.target_quantity), 0).label('total_target_quantity')
).where(
and_(
self.model.tenant_id == tenant_id,
self.model.transformation_date >= start_date
)
).group_by(self.model.source_stage, self.model.target_stage)
)
summary = {}
total_transformations = 0
for row in result:
source_stage = row.source_stage.value if row.source_stage else "unknown"
target_stage = row.target_stage.value if row.target_stage else "unknown"
stage_key = f"{source_stage}_to_{target_stage}"
summary[stage_key] = {
'count': row.count,
'total_source_quantity': float(row.total_source_quantity),
'total_target_quantity': float(row.total_target_quantity),
'average_conversion_ratio': float(row.total_target_quantity) / float(row.total_source_quantity) if row.total_source_quantity > 0 else 0
}
total_transformations += row.count
summary['total_transformations'] = total_transformations
summary['period_days'] = days_back
return summary
except Exception as e:
logger.error("Failed to get transformation summary", error=str(e), tenant_id=tenant_id)
raise
async def calculate_transformation_efficiency(
self,
tenant_id: UUID,
source_ingredient_id: UUID,
target_ingredient_id: UUID,
days_back: int = 30
) -> Dict[str, float]:
"""Calculate transformation efficiency between ingredients"""
try:
start_date = datetime.now() - timedelta(days=days_back)
result = await self.session.execute(
select(
func.count(self.model.id).label('transformation_count'),
func.coalesce(func.sum(self.model.source_quantity), 0).label('total_source'),
func.coalesce(func.sum(self.model.target_quantity), 0).label('total_target'),
func.coalesce(func.avg(self.model.conversion_ratio), 0).label('avg_conversion_ratio')
).where(
and_(
self.model.tenant_id == tenant_id,
self.model.source_ingredient_id == source_ingredient_id,
self.model.target_ingredient_id == target_ingredient_id,
self.model.transformation_date >= start_date
)
)
)
row = result.first()
return {
'transformation_count': row.transformation_count or 0,
'total_source_quantity': float(row.total_source) if row.total_source else 0.0,
'total_target_quantity': float(row.total_target) if row.total_target else 0.0,
'average_conversion_ratio': float(row.avg_conversion_ratio) if row.avg_conversion_ratio else 0.0,
'efficiency_percentage': (float(row.total_target) / float(row.total_source) * 100) if row.total_source and row.total_source > 0 else 0.0,
'period_days': days_back
}
except Exception as e:
logger.error("Failed to calculate transformation efficiency", error=str(e))
raise

View File

@@ -10,7 +10,7 @@ from pydantic import BaseModel, Field, validator
from typing import Generic, TypeVar
from enum import Enum
from app.models.inventory import UnitOfMeasure, IngredientCategory, StockMovementType, ProductType, ProductCategory
from app.models.inventory import UnitOfMeasure, IngredientCategory, StockMovementType, ProductType, ProductCategory, ProductionStage
T = TypeVar('T')
@@ -172,17 +172,26 @@ class StockCreate(InventoryBaseSchema):
batch_number: Optional[str] = Field(None, max_length=100, description="Batch number")
lot_number: Optional[str] = Field(None, max_length=100, description="Lot number")
supplier_batch_ref: Optional[str] = Field(None, max_length=100, description="Supplier batch reference")
# Production stage tracking
production_stage: ProductionStage = Field(ProductionStage.RAW_INGREDIENT, description="Production stage of the stock")
transformation_reference: Optional[str] = Field(None, max_length=100, description="Transformation reference ID")
current_quantity: float = Field(..., ge=0, description="Current quantity")
received_date: Optional[datetime] = Field(None, description="Date received")
expiration_date: Optional[datetime] = Field(None, description="Expiration date")
best_before_date: Optional[datetime] = Field(None, description="Best before date")
# Stage-specific expiration fields
original_expiration_date: Optional[datetime] = Field(None, description="Original batch expiration (for par-baked items)")
transformation_date: Optional[datetime] = Field(None, description="Date when product was transformed")
final_expiration_date: Optional[datetime] = Field(None, description="Final expiration after transformation")
unit_cost: Optional[Decimal] = Field(None, ge=0, description="Unit cost")
storage_location: Optional[str] = Field(None, max_length=100, description="Storage location")
warehouse_zone: Optional[str] = Field(None, max_length=50, description="Warehouse zone")
shelf_position: Optional[str] = Field(None, max_length=50, description="Shelf position")
quality_status: str = Field("good", description="Quality status")
@@ -191,18 +200,27 @@ class StockUpdate(InventoryBaseSchema):
batch_number: Optional[str] = Field(None, max_length=100, description="Batch number")
lot_number: Optional[str] = Field(None, max_length=100, description="Lot number")
supplier_batch_ref: Optional[str] = Field(None, max_length=100, description="Supplier batch reference")
# Production stage tracking
production_stage: Optional[ProductionStage] = Field(None, description="Production stage of the stock")
transformation_reference: Optional[str] = Field(None, max_length=100, description="Transformation reference ID")
current_quantity: Optional[float] = Field(None, ge=0, description="Current quantity")
reserved_quantity: Optional[float] = Field(None, ge=0, description="Reserved quantity")
received_date: Optional[datetime] = Field(None, description="Date received")
expiration_date: Optional[datetime] = Field(None, description="Expiration date")
best_before_date: Optional[datetime] = Field(None, description="Best before date")
# Stage-specific expiration fields
original_expiration_date: Optional[datetime] = Field(None, description="Original batch expiration (for par-baked items)")
transformation_date: Optional[datetime] = Field(None, description="Date when product was transformed")
final_expiration_date: Optional[datetime] = Field(None, description="Final expiration after transformation")
unit_cost: Optional[Decimal] = Field(None, ge=0, description="Unit cost")
storage_location: Optional[str] = Field(None, max_length=100, description="Storage location")
warehouse_zone: Optional[str] = Field(None, max_length=50, description="Warehouse zone")
shelf_position: Optional[str] = Field(None, max_length=50, description="Shelf position")
is_available: Optional[bool] = Field(None, description="Is available")
quality_status: Optional[str] = Field(None, description="Quality status")
@@ -215,12 +233,23 @@ class StockResponse(InventoryBaseSchema):
batch_number: Optional[str]
lot_number: Optional[str]
supplier_batch_ref: Optional[str]
# Production stage tracking
production_stage: ProductionStage
transformation_reference: Optional[str]
current_quantity: float
reserved_quantity: float
available_quantity: float
received_date: Optional[datetime]
expiration_date: Optional[datetime]
best_before_date: Optional[datetime]
# Stage-specific expiration fields
original_expiration_date: Optional[datetime]
transformation_date: Optional[datetime]
final_expiration_date: Optional[datetime]
unit_cost: Optional[float]
total_cost: Optional[float]
storage_location: Optional[str]
@@ -231,7 +260,7 @@ class StockResponse(InventoryBaseSchema):
quality_status: str
created_at: datetime
updated_at: datetime
# Related data
ingredient: Optional[IngredientResponse] = None
@@ -278,6 +307,60 @@ class StockMovementResponse(InventoryBaseSchema):
ingredient: Optional[IngredientResponse] = None
# ===== PRODUCT TRANSFORMATION SCHEMAS =====
class ProductTransformationCreate(InventoryBaseSchema):
"""Schema for creating product transformations"""
source_ingredient_id: str = Field(..., description="Source ingredient ID")
target_ingredient_id: str = Field(..., description="Target ingredient ID")
source_stage: ProductionStage = Field(..., description="Source production stage")
target_stage: ProductionStage = Field(..., description="Target production stage")
source_quantity: float = Field(..., gt=0, description="Input quantity")
target_quantity: float = Field(..., gt=0, description="Output quantity")
conversion_ratio: Optional[float] = Field(None, gt=0, description="Conversion ratio (auto-calculated if not provided)")
# Expiration handling
expiration_calculation_method: str = Field("days_from_transformation", description="How to calculate expiration")
expiration_days_offset: Optional[int] = Field(1, description="Days from transformation date for expiration")
# Process details
process_notes: Optional[str] = Field(None, description="Process notes")
target_batch_number: Optional[str] = Field(None, max_length=100, description="Target batch number")
# Source stock selection (optional - if not provided, uses FIFO)
source_stock_ids: Optional[List[str]] = Field(None, description="Specific source stock IDs to transform")
class ProductTransformationResponse(InventoryBaseSchema):
"""Schema for product transformation responses"""
id: str
tenant_id: str
transformation_reference: str
source_ingredient_id: str
target_ingredient_id: str
source_stage: ProductionStage
target_stage: ProductionStage
source_quantity: float
target_quantity: float
conversion_ratio: float
expiration_calculation_method: str
expiration_days_offset: Optional[int]
transformation_date: datetime
process_notes: Optional[str]
performed_by: Optional[str]
source_batch_numbers: Optional[str]
target_batch_number: Optional[str]
is_completed: bool
is_reversed: bool
created_at: datetime
created_by: Optional[str]
# Related data
source_ingredient: Optional[IngredientResponse] = None
target_ingredient: Optional[IngredientResponse] = None
# ===== ALERT SCHEMAS =====
class StockAlertResponse(InventoryBaseSchema):
@@ -379,6 +462,8 @@ class InventoryFilter(BaseModel):
class StockFilter(BaseModel):
"""Stock filtering parameters"""
ingredient_id: Optional[str] = None
production_stage: Optional[ProductionStage] = None
transformation_reference: Optional[str] = None
is_available: Optional[bool] = None
is_expired: Optional[bool] = None
expiring_within_days: Optional[int] = None

View File

@@ -535,6 +535,185 @@ class InventoryService:
logger.error("Failed to get inventory summary", error=str(e), tenant_id=tenant_id)
raise
async def get_stock(
self,
tenant_id: UUID,
skip: int = 0,
limit: int = 100,
ingredient_id: Optional[UUID] = None,
available_only: bool = True
) -> List[StockResponse]:
"""Get stock entries with filtering"""
try:
async with get_db_transaction() as db:
stock_repo = StockRepository(db)
ingredient_repo = IngredientRepository(db)
# Get stock entries
stock_entries = await stock_repo.get_stock_entries(
tenant_id, skip, limit, ingredient_id, available_only
)
responses = []
for stock in stock_entries:
# Get ingredient information
ingredient = await ingredient_repo.get_by_id(stock.ingredient_id)
response = StockResponse(**stock.to_dict())
if ingredient:
response.ingredient = IngredientResponse(**ingredient.to_dict())
responses.append(response)
return responses
except Exception as e:
logger.error("Failed to get stock entries", error=str(e), tenant_id=tenant_id)
raise
# ===== DELETION METHODS =====
async def hard_delete_ingredient(
self,
ingredient_id: UUID,
tenant_id: UUID,
user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
Completely delete an ingredient and all associated data.
This includes:
- All stock entries
- All stock movements
- All stock alerts
- The ingredient record itself
Returns a summary of what was deleted.
"""
try:
deletion_summary = {
"ingredient_id": str(ingredient_id),
"deleted_stock_entries": 0,
"deleted_stock_movements": 0,
"deleted_stock_alerts": 0,
"ingredient_name": None,
"success": False
}
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
movement_repo = StockMovementRepository(db)
# 1. Verify ingredient exists and belongs to tenant
ingredient = await ingredient_repo.get_by_id(ingredient_id)
if not ingredient or ingredient.tenant_id != tenant_id:
raise ValueError(f"Ingredient {ingredient_id} not found or access denied")
deletion_summary["ingredient_name"] = ingredient.name
logger.info(
"Starting hard deletion of ingredient",
ingredient_id=str(ingredient_id),
ingredient_name=ingredient.name,
tenant_id=str(tenant_id)
)
# 2. Delete all stock movements first (due to foreign key constraints)
try:
deleted_movements = await movement_repo.delete_by_ingredient(ingredient_id, tenant_id)
deletion_summary["deleted_stock_movements"] = deleted_movements
logger.info(f"Deleted {deleted_movements} stock movements")
except Exception as e:
logger.warning(f"Error deleting stock movements: {str(e)}")
# Continue with deletion even if this fails
# 3. Delete all stock entries
try:
deleted_stock = await stock_repo.delete_by_ingredient(ingredient_id, tenant_id)
deletion_summary["deleted_stock_entries"] = deleted_stock
logger.info(f"Deleted {deleted_stock} stock entries")
except Exception as e:
logger.warning(f"Error deleting stock entries: {str(e)}")
# Continue with deletion even if this fails
# 4. Delete stock alerts if they exist
try:
# Note: StockAlert deletion would go here if that table exists
# For now, we'll assume this is handled by cascading deletes or doesn't exist
deletion_summary["deleted_stock_alerts"] = 0
except Exception as e:
logger.warning(f"Error deleting stock alerts: {str(e)}")
# 5. Finally, delete the ingredient itself
deleted_ingredient = await ingredient_repo.delete_by_id(ingredient_id, tenant_id)
if not deleted_ingredient:
raise ValueError("Failed to delete ingredient record")
deletion_summary["success"] = True
logger.info(
"Successfully completed hard deletion of ingredient",
ingredient_id=str(ingredient_id),
ingredient_name=ingredient.name,
summary=deletion_summary
)
return deletion_summary
except ValueError:
# Re-raise validation errors
raise
except Exception as e:
logger.error(
"Failed to hard delete ingredient",
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id),
error=str(e)
)
raise
async def soft_delete_ingredient(
self,
ingredient_id: UUID,
tenant_id: UUID,
user_id: Optional[UUID] = None
) -> IngredientResponse:
"""
Soft delete an ingredient (mark as inactive).
This preserves all associated data for reporting and audit purposes.
"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
# Verify ingredient exists and belongs to tenant
ingredient = await ingredient_repo.get_by_id(ingredient_id)
if not ingredient or ingredient.tenant_id != tenant_id:
raise ValueError(f"Ingredient {ingredient_id} not found or access denied")
# Mark as inactive
update_data = IngredientUpdate(is_active=False)
updated_ingredient = await ingredient_repo.update_ingredient(ingredient_id, update_data)
logger.info(
"Soft deleted ingredient",
ingredient_id=str(ingredient_id),
ingredient_name=ingredient.name,
tenant_id=str(tenant_id)
)
return IngredientResponse(**updated_ingredient.to_dict())
except ValueError:
raise
except Exception as e:
logger.error(
"Failed to soft delete ingredient",
ingredient_id=str(ingredient_id),
tenant_id=str(tenant_id),
error=str(e)
)
raise
# ===== PRIVATE HELPER METHODS =====
async def _validate_ingredient_data(self, ingredient_data: IngredientCreate, tenant_id: UUID):

View File

@@ -0,0 +1,332 @@
# services/inventory/app/services/transformation_service.py
"""
Product Transformation Service - Business Logic Layer
"""
from typing import List, Optional, Dict, Any, Tuple
from uuid import UUID
from datetime import datetime, timedelta
import structlog
import json
from app.models.inventory import ProductTransformation, Stock, StockMovement, StockMovementType, ProductionStage
from app.repositories.transformation_repository import TransformationRepository
from app.repositories.ingredient_repository import IngredientRepository
from app.repositories.stock_repository import StockRepository
from app.repositories.stock_movement_repository import StockMovementRepository
from app.schemas.inventory import (
ProductTransformationCreate, ProductTransformationResponse,
StockCreate, StockMovementCreate,
IngredientResponse
)
from app.core.database import get_db_transaction
from shared.database.exceptions import DatabaseError
logger = structlog.get_logger()
class TransformationService:
"""Service layer for product transformation operations"""
def __init__(self):
pass
async def create_transformation(
self,
transformation_data: ProductTransformationCreate,
tenant_id: UUID,
user_id: Optional[UUID] = None
) -> ProductTransformationResponse:
"""Create a product transformation with stock movements"""
try:
async with get_db_transaction() as db:
transformation_repo = TransformationRepository(db)
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
movement_repo = StockMovementRepository(db)
# Validate ingredients exist
source_ingredient = await ingredient_repo.get_by_id(UUID(transformation_data.source_ingredient_id))
target_ingredient = await ingredient_repo.get_by_id(UUID(transformation_data.target_ingredient_id))
if not source_ingredient or source_ingredient.tenant_id != tenant_id:
raise ValueError("Source ingredient not found")
if not target_ingredient or target_ingredient.tenant_id != tenant_id:
raise ValueError("Target ingredient not found")
# Reserve source stock using FIFO by default
source_reservations = await stock_repo.reserve_stock(
tenant_id,
UUID(transformation_data.source_ingredient_id),
transformation_data.source_quantity,
fifo=True
)
if not source_reservations:
raise ValueError(f"Insufficient stock available for transformation. Required: {transformation_data.source_quantity}")
# Create transformation record
source_batch_numbers = [res.get('batch_number') for res in source_reservations if res.get('batch_number')]
transformation = await transformation_repo.create_transformation(
transformation_data,
tenant_id,
user_id,
source_batch_numbers
)
# Calculate expiration date for target product
target_expiration_date = self._calculate_target_expiration(
transformation_data.expiration_calculation_method,
transformation_data.expiration_days_offset,
source_reservations
)
# Consume source stock and create movements
consumed_items = []
for reservation in source_reservations:
stock_id = UUID(reservation['stock_id'])
reserved_qty = reservation['reserved_quantity']
# Consume from reserved stock
await stock_repo.consume_stock(stock_id, reserved_qty, from_reserved=True)
# Create movement record
movement_data = StockMovementCreate(
ingredient_id=transformation_data.source_ingredient_id,
stock_id=str(stock_id),
movement_type=StockMovementType.TRANSFORMATION,
quantity=reserved_qty,
reference_number=transformation.transformation_reference,
notes=f"Transformation: {transformation_data.source_stage.value}{transformation_data.target_stage.value}"
)
await movement_repo.create_movement(movement_data, tenant_id, user_id)
consumed_items.append({
'stock_id': str(stock_id),
'quantity_consumed': reserved_qty,
'batch_number': reservation.get('batch_number')
})
# Create target stock entry
target_stock_data = StockCreate(
ingredient_id=transformation_data.target_ingredient_id,
production_stage=transformation_data.target_stage,
transformation_reference=transformation.transformation_reference,
current_quantity=transformation_data.target_quantity,
batch_number=transformation_data.target_batch_number or f"TRANS-{transformation.transformation_reference}",
expiration_date=target_expiration_date['expiration_date'],
original_expiration_date=target_expiration_date.get('original_expiration_date'),
transformation_date=transformation.transformation_date,
final_expiration_date=target_expiration_date['expiration_date'],
unit_cost=self._calculate_target_unit_cost(consumed_items, transformation_data.target_quantity),
quality_status="good"
)
target_stock = await stock_repo.create_stock_entry(target_stock_data, tenant_id)
# Create target stock movement
target_movement_data = StockMovementCreate(
ingredient_id=transformation_data.target_ingredient_id,
stock_id=str(target_stock.id),
movement_type=StockMovementType.TRANSFORMATION,
quantity=transformation_data.target_quantity,
reference_number=transformation.transformation_reference,
notes=f"Transformation result: {transformation_data.source_stage.value}{transformation_data.target_stage.value}"
)
await movement_repo.create_movement(target_movement_data, tenant_id, user_id)
# Convert to response schema
response = ProductTransformationResponse(**transformation.to_dict())
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
logger.info(
"Transformation completed successfully",
transformation_id=transformation.id,
reference=transformation.transformation_reference,
source_quantity=transformation_data.source_quantity,
target_quantity=transformation_data.target_quantity
)
return response
except Exception as e:
logger.error("Failed to create transformation", error=str(e), tenant_id=tenant_id)
raise
async def get_transformation(
self,
transformation_id: UUID,
tenant_id: UUID
) -> Optional[ProductTransformationResponse]:
"""Get transformation by ID"""
try:
async with get_db_transaction() as db:
transformation_repo = TransformationRepository(db)
ingredient_repo = IngredientRepository(db)
transformation = await transformation_repo.get_by_id(transformation_id)
if not transformation or transformation.tenant_id != tenant_id:
return None
# Get related ingredients
source_ingredient = await ingredient_repo.get_by_id(transformation.source_ingredient_id)
target_ingredient = await ingredient_repo.get_by_id(transformation.target_ingredient_id)
response = ProductTransformationResponse(**transformation.to_dict())
if source_ingredient:
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
if target_ingredient:
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
return response
except Exception as e:
logger.error("Failed to get transformation", error=str(e), transformation_id=transformation_id)
raise
async def get_transformations(
self,
tenant_id: UUID,
skip: int = 0,
limit: int = 100,
ingredient_id: Optional[UUID] = None,
source_stage: Optional[ProductionStage] = None,
target_stage: Optional[ProductionStage] = None,
days_back: Optional[int] = None
) -> List[ProductTransformationResponse]:
"""Get transformations with filtering"""
try:
async with get_db_transaction() as db:
transformation_repo = TransformationRepository(db)
ingredient_repo = IngredientRepository(db)
if ingredient_id:
# Get transformations where ingredient is either source or target
source_transformations = await transformation_repo.get_transformations_by_ingredient(
tenant_id, ingredient_id, is_source=True, skip=0, limit=limit//2, days_back=days_back
)
target_transformations = await transformation_repo.get_transformations_by_ingredient(
tenant_id, ingredient_id, is_source=False, skip=0, limit=limit//2, days_back=days_back
)
transformations = source_transformations + target_transformations
# Remove duplicates and sort by date
unique_transformations = {t.id: t for t in transformations}.values()
transformations = sorted(unique_transformations, key=lambda x: x.transformation_date, reverse=True)
transformations = transformations[skip:skip+limit]
else:
transformations = await transformation_repo.get_transformations_by_stage(
tenant_id, source_stage, target_stage, skip, limit, days_back
)
responses = []
for transformation in transformations:
# Get related ingredients
source_ingredient = await ingredient_repo.get_by_id(transformation.source_ingredient_id)
target_ingredient = await ingredient_repo.get_by_id(transformation.target_ingredient_id)
response = ProductTransformationResponse(**transformation.to_dict())
if source_ingredient:
response.source_ingredient = IngredientResponse(**source_ingredient.to_dict())
if target_ingredient:
response.target_ingredient = IngredientResponse(**target_ingredient.to_dict())
responses.append(response)
return responses
except Exception as e:
logger.error("Failed to get transformations", error=str(e), tenant_id=tenant_id)
raise
def _calculate_target_expiration(
self,
calculation_method: str,
expiration_days_offset: Optional[int],
source_reservations: List[Dict[str, Any]]
) -> Dict[str, Optional[datetime]]:
"""Calculate expiration date for target product"""
current_time = datetime.now()
if calculation_method == "days_from_transformation":
# Calculate expiration based on transformation date + offset
if expiration_days_offset:
expiration_date = current_time + timedelta(days=expiration_days_offset)
else:
expiration_date = current_time + timedelta(days=1) # Default 1 day for fresh baked goods
# Use earliest source expiration as original
original_expiration = None
if source_reservations:
source_expirations = [res.get('expiration_date') for res in source_reservations if res.get('expiration_date')]
if source_expirations:
original_expiration = min(source_expirations)
return {
'expiration_date': expiration_date,
'original_expiration_date': original_expiration
}
elif calculation_method == "preserve_original":
# Use the earliest expiration date from source stock
if source_reservations:
source_expirations = [res.get('expiration_date') for res in source_reservations if res.get('expiration_date')]
if source_expirations:
expiration_date = min(source_expirations)
return {
'expiration_date': expiration_date,
'original_expiration_date': expiration_date
}
# Fallback to default
return {
'expiration_date': current_time + timedelta(days=7),
'original_expiration_date': None
}
else:
# Default fallback
return {
'expiration_date': current_time + timedelta(days=1),
'original_expiration_date': None
}
def _calculate_target_unit_cost(
self,
consumed_items: List[Dict[str, Any]],
target_quantity: float
) -> Optional[float]:
"""Calculate unit cost for target product based on consumed items"""
# This is a simplified calculation - in reality you'd want to consider
# additional costs like labor, energy, etc.
total_source_cost = 0.0
total_source_quantity = 0.0
for item in consumed_items:
quantity = item.get('quantity_consumed', 0)
# Note: In a real implementation, you'd fetch the unit cost from the stock items
# For now, we'll use a placeholder
total_source_quantity += quantity
if total_source_quantity > 0 and target_quantity > 0:
# Simple cost transfer based on quantity ratio
return total_source_cost / target_quantity
return None
async def get_transformation_summary(
self,
tenant_id: UUID,
days_back: int = 30
) -> Dict[str, Any]:
"""Get transformation summary for dashboard"""
try:
async with get_db_transaction() as db:
transformation_repo = TransformationRepository(db)
summary = await transformation_repo.get_transformation_summary_by_period(tenant_id, days_back)
return summary
except Exception as e:
logger.error("Failed to get transformation summary", error=str(e), tenant_id=tenant_id)
raise