Files
bakery-ia/services/inventory/app/repositories/ingredient_repository.py
Urtzi Alfaro 10c779858a Fix enum mismatch: Update Python enums and seed data to match database uppercase values
- Fixed ProductType enum values from lowercase to uppercase (INGREDIENT, FINISHED_PRODUCT)
- Fixed UnitOfMeasure enum values from lowercase/abbreviated to uppercase (KILOGRAMS, LITERS, etc.)
- Fixed IngredientCategory enum values from lowercase to uppercase (FLOUR, YEAST, etc.)
- Fixed ProductCategory enum values from lowercase to uppercase (BREAD, CROISSANTS, etc.)
- Updated seed data files to use correct uppercase enum values
- Fixed hardcoded enum references throughout the codebase
- This resolves the InvalidTextRepresentationError when inserting inventory data

Generated by Mistral Vibe.
Co-Authored-By: Mistral Vibe <vibe@mistral.ai>
2025-12-13 16:49:04 +01:00

668 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# services/inventory/app/repositories/ingredient_repository.py
"""
Ingredient Repository using Repository Pattern
"""
from typing import List, Optional, Dict, Any
from uuid import UUID
from datetime import datetime
from sqlalchemy import select, func, and_, or_, desc, asc
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from app.models.inventory import Ingredient, Stock
from app.schemas.inventory import IngredientCreate, IngredientUpdate
from shared.database.repository import BaseRepository
logger = structlog.get_logger()
class IngredientRepository(BaseRepository[Ingredient, IngredientCreate, IngredientUpdate]):
"""Repository for ingredient operations"""
def __init__(self, session: AsyncSession):
super().__init__(Ingredient, session)
async def create_ingredient(self, ingredient_data: IngredientCreate, tenant_id: UUID) -> Ingredient:
"""Create a new ingredient"""
try:
# Prepare data and map schema fields to model fields
create_data = ingredient_data.model_dump()
create_data['tenant_id'] = tenant_id
# Handle product_type enum conversion
product_type_value = create_data.get('product_type')
# Log warning if product_type is missing (should be provided by frontend)
if not product_type_value:
logger.warning(
"product_type not provided, defaulting to 'ingredient'",
ingredient_name=create_data.get('name'),
tenant_id=tenant_id
)
product_type_value = 'INGREDIENT'
if 'product_type' in create_data:
from app.models.inventory import ProductType
try:
# Convert string to enum object
if isinstance(product_type_value, str):
for enum_member in ProductType:
if enum_member.value == product_type_value or enum_member.name == product_type_value:
create_data['product_type'] = enum_member
break
else:
# If not found, default to INGREDIENT
logger.warning(
"Invalid product_type value, defaulting to INGREDIENT",
invalid_value=product_type_value,
tenant_id=tenant_id
)
create_data['product_type'] = ProductType.INGREDIENT
# If it's already an enum, keep it
except Exception as e:
# Fallback to INGREDIENT if any issues
logger.error(
"Error converting product_type to enum, defaulting to INGREDIENT",
error=str(e),
tenant_id=tenant_id
)
create_data['product_type'] = ProductType.INGREDIENT
# Handle category mapping based on product type
if 'category' in create_data:
category_value = create_data.pop('category')
if product_type_value == 'FINISHED_PRODUCT':
# Map to product_category for finished products
from app.models.inventory import ProductCategory
if category_value:
try:
# Find the enum member by value
for enum_member in ProductCategory:
if enum_member.value == category_value:
create_data['product_category'] = enum_member
break
else:
# If not found, default to OTHER
create_data['product_category'] = ProductCategory.OTHER_PRODUCTS
except Exception:
# Fallback to OTHER if any issues
create_data['product_category'] = ProductCategory.OTHER_PRODUCTS
else:
# Map to ingredient_category for ingredients
from app.models.inventory import IngredientCategory
if category_value:
try:
# Find the enum member by value
for enum_member in IngredientCategory:
if enum_member.value == category_value:
create_data['ingredient_category'] = enum_member
break
else:
# If not found, default to OTHER
create_data['ingredient_category'] = IngredientCategory.OTHER
except Exception:
# Fallback to OTHER if any issues
create_data['ingredient_category'] = IngredientCategory.OTHER
# Convert unit_of_measure string to enum object
if 'unit_of_measure' in create_data:
unit_value = create_data['unit_of_measure']
from app.models.inventory import UnitOfMeasure
try:
# Find the enum member by value
for enum_member in UnitOfMeasure:
if enum_member.value == unit_value:
create_data['unit_of_measure'] = enum_member
break
else:
# If not found, default to UNITS
create_data['unit_of_measure'] = UnitOfMeasure.UNITS
except Exception:
# Fallback to UNITS if any issues
create_data['unit_of_measure'] = UnitOfMeasure.UNITS
# Create record
record = await self.create(create_data)
logger.info(
"Created ingredient",
ingredient_id=record.id,
name=record.name,
ingredient_category=record.ingredient_category.value if record.ingredient_category else None,
tenant_id=tenant_id
)
return record
except Exception as e:
logger.error("Failed to create ingredient", error=str(e), tenant_id=tenant_id)
raise
async def update(self, record_id: Any, obj_in: IngredientUpdate, **kwargs) -> Optional[Ingredient]:
"""Override update to handle product_type and category enum conversions"""
try:
# Prepare data and map schema fields to model fields
update_data = obj_in.model_dump(exclude_unset=True)
# Handle product_type enum conversion
if 'product_type' in update_data:
product_type_value = update_data['product_type']
from app.models.inventory import ProductType
try:
# Convert string to enum object
if isinstance(product_type_value, str):
for enum_member in ProductType:
if enum_member.value == product_type_value or enum_member.name == product_type_value:
update_data['product_type'] = enum_member
break
else:
# If not found, keep original value (don't update)
del update_data['product_type']
# If it's already an enum, keep it
except Exception:
# Remove invalid product_type to avoid update
del update_data['product_type']
# Handle category mapping based on product type
if 'category' in update_data:
category_value = update_data.pop('category')
product_type_value = update_data.get('product_type', 'INGREDIENT')
# Get current product if we need to determine type
if 'product_type' not in update_data:
current_record = await self.get_by_id(record_id)
if current_record:
product_type_value = current_record.product_type.value if current_record.product_type else 'INGREDIENT'
if product_type_value == 'FINISHED_PRODUCT':
# Map to product_category for finished products
from app.models.inventory import ProductCategory
if category_value:
try:
for enum_member in ProductCategory:
if enum_member.value == category_value:
update_data['product_category'] = enum_member
# Clear ingredient_category when setting product_category
update_data['ingredient_category'] = None
break
except Exception:
pass
else:
# Map to ingredient_category for ingredients
from app.models.inventory import IngredientCategory
if category_value:
try:
for enum_member in IngredientCategory:
if enum_member.value == category_value:
update_data['ingredient_category'] = enum_member
# Clear product_category when setting ingredient_category
update_data['product_category'] = None
break
except Exception:
pass
# Handle unit_of_measure enum conversion
if 'unit_of_measure' in update_data:
unit_value = update_data['unit_of_measure']
from app.models.inventory import UnitOfMeasure
try:
if isinstance(unit_value, str):
for enum_member in UnitOfMeasure:
if enum_member.value == unit_value:
update_data['unit_of_measure'] = enum_member
break
else:
# If not found, keep original value
del update_data['unit_of_measure']
except Exception:
del update_data['unit_of_measure']
# Call parent update method
return await super().update(record_id, update_data, **kwargs)
except Exception as e:
logger.error("Failed to update ingredient", error=str(e), record_id=record_id)
raise
async def get_ingredients_by_tenant(
self,
tenant_id: UUID,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[Ingredient]:
"""Get ingredients for a tenant with filtering"""
try:
# Handle search filter separately since it requires special query logic
if filters and filters.get('search'):
search_term = filters['search']
logger.info(f"Searching ingredients with term: '{search_term}'", tenant_id=tenant_id)
return await self.search_ingredients(tenant_id, search_term, skip, limit)
# Handle other filters with standard multi-get
query_filters = {'tenant_id': tenant_id}
if filters:
if filters.get('category'):
query_filters['category'] = filters['category']
if filters.get('product_type'):
# Convert string to enum object
from app.models.inventory import ProductType
product_type_value = filters['product_type']
try:
# Find the enum member by value
for enum_member in ProductType:
if enum_member.value == product_type_value:
query_filters['product_type'] = enum_member
break
else:
# If not found, skip this filter
logger.warning(f"Invalid product_type value: {product_type_value}")
except Exception as e:
logger.warning(f"Error converting product_type: {e}")
# Skip invalid product_type filter
if filters.get('is_active') is not None:
query_filters['is_active'] = filters['is_active']
if filters.get('is_perishable') is not None:
query_filters['is_perishable'] = filters['is_perishable']
ingredients = await self.get_multi(
skip=skip,
limit=limit,
filters=query_filters,
order_by='name'
)
return ingredients
except Exception as e:
logger.error("Failed to get ingredients", error=str(e), tenant_id=tenant_id)
raise
async def search_ingredients(
self,
tenant_id: UUID,
search_term: str,
skip: int = 0,
limit: int = 50
) -> List[Ingredient]:
"""Search ingredients by name, sku, or barcode"""
try:
# Add tenant filter to search
query = select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
or_(
self.model.name.ilike(f"%{search_term}%"),
self.model.sku.ilike(f"%{search_term}%"),
self.model.barcode.ilike(f"%{search_term}%"),
self.model.brand.ilike(f"%{search_term}%")
)
)
).offset(skip).limit(limit)
result = await self.session.execute(query)
return result.scalars().all()
except Exception as e:
logger.error("Failed to search ingredients", error=str(e), tenant_id=tenant_id)
raise
async def get_low_stock_ingredients(self, tenant_id: UUID) -> List[Dict[str, Any]]:
"""Get ingredients with low stock levels"""
try:
# Query ingredients with their current stock levels
query = select(
Ingredient,
func.coalesce(func.sum(Stock.available_quantity), 0).label('current_stock')
).outerjoin(
Stock, and_(
Stock.ingredient_id == Ingredient.id,
Stock.is_available == True
)
).where(
Ingredient.tenant_id == tenant_id
).group_by(Ingredient.id).having(
func.coalesce(func.sum(Stock.available_quantity), 0) <= Ingredient.low_stock_threshold
)
result = await self.session.execute(query)
results = []
for ingredient, current_stock in result:
results.append({
'ingredient': ingredient,
'current_stock': float(current_stock) if current_stock else 0.0,
'threshold': ingredient.low_stock_threshold,
'needs_reorder': (
current_stock <= ingredient.reorder_point
if current_stock and ingredient.reorder_point is not None else True
)
})
return results
except Exception as e:
logger.error("Failed to get low stock ingredients", error=str(e), tenant_id=tenant_id)
raise
async def get_ingredients_needing_reorder(self, tenant_id: UUID) -> List[Dict[str, Any]]:
"""Get ingredients that need reordering"""
try:
query = select(
Ingredient,
func.coalesce(func.sum(Stock.available_quantity), 0).label('current_stock')
).outerjoin(
Stock, and_(
Stock.ingredient_id == Ingredient.id,
Stock.is_available == True
)
).where(
and_(
Ingredient.tenant_id == tenant_id,
Ingredient.is_active == True
)
).group_by(Ingredient.id).having(
func.coalesce(func.sum(Stock.available_quantity), 0) <= Ingredient.reorder_point
)
result = await self.session.execute(query)
results = []
for ingredient, current_stock in result:
results.append({
'ingredient': ingredient,
'current_stock': float(current_stock) if current_stock else 0.0,
'reorder_point': ingredient.reorder_point,
'reorder_quantity': ingredient.reorder_quantity
})
return results
except Exception as e:
logger.error("Failed to get ingredients needing reorder", error=str(e), tenant_id=tenant_id)
raise
async def get_by_sku(self, tenant_id: UUID, sku: str) -> Optional[Ingredient]:
"""Get ingredient by SKU"""
try:
result = await self.session.execute(
select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.sku == sku
)
)
)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Failed to get ingredient by SKU", error=str(e), sku=sku, tenant_id=tenant_id)
raise
async def get_by_barcode(self, tenant_id: UUID, barcode: str) -> Optional[Ingredient]:
"""Get ingredient by barcode"""
try:
result = await self.session.execute(
select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.barcode == barcode
)
)
)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Failed to get ingredient by barcode", error=str(e), barcode=barcode, tenant_id=tenant_id)
raise
async def update_last_purchase_price(self, ingredient_id: UUID, price: float) -> Optional[Ingredient]:
"""Update the last purchase price for an ingredient"""
try:
from app.schemas.inventory import IngredientUpdate
update_data = IngredientUpdate(last_purchase_price=price)
return await self.update(ingredient_id, update_data)
except Exception as e:
logger.error("Failed to update last purchase price", error=str(e), ingredient_id=ingredient_id)
raise
async def update_weighted_average_cost(
self,
ingredient_id: UUID,
current_stock_quantity: float,
new_purchase_quantity: float,
new_unit_cost: float
) -> Optional[Ingredient]:
"""
Update the average cost using weighted average calculation.
Formula:
new_average_cost = (current_stock_qty × current_avg_cost + new_qty × new_cost) / (current_stock_qty + new_qty)
Args:
ingredient_id: ID of the ingredient
current_stock_quantity: Current stock quantity before this purchase
new_purchase_quantity: Quantity being purchased
new_unit_cost: Unit cost of the new purchase
Returns:
Updated ingredient or None if not found
"""
try:
# Get current ingredient data
ingredient = await self.get_by_id(ingredient_id)
if not ingredient:
logger.warning("Ingredient not found for average cost update", ingredient_id=ingredient_id)
return None
from decimal import Decimal
# Get current average cost (default to new cost if not set)
current_avg_cost = float(ingredient.average_cost) if ingredient.average_cost else float(new_unit_cost)
# Calculate weighted average
# If no current stock, just use the new purchase price
if current_stock_quantity <= 0:
new_average_cost = Decimal(str(new_unit_cost))
else:
# Weighted average formula
total_cost = (current_stock_quantity * current_avg_cost) + (new_purchase_quantity * new_unit_cost)
total_quantity = current_stock_quantity + new_purchase_quantity
new_average_cost = Decimal(str(total_cost / total_quantity))
# Update the ingredient
from app.schemas.inventory import IngredientUpdate
update_data = IngredientUpdate(average_cost=new_average_cost)
updated_ingredient = await self.update(ingredient_id, update_data)
logger.info(
"Updated weighted average cost",
ingredient_id=ingredient_id,
old_average_cost=current_avg_cost,
new_average_cost=float(new_average_cost),
current_stock_qty=current_stock_quantity,
new_purchase_qty=new_purchase_quantity,
new_unit_cost=new_unit_cost
)
return updated_ingredient
except Exception as e:
logger.error(
"Failed to update weighted average cost",
error=str(e),
ingredient_id=ingredient_id
)
raise
async def get_ingredients_by_category(self, tenant_id: UUID, category: str) -> List[Ingredient]:
"""Get all ingredients in a specific category"""
try:
result = await self.session.execute(
select(self.model).where(
and_(
self.model.tenant_id == tenant_id,
self.model.category == category,
self.model.is_active == True
)
).order_by(self.model.name)
)
return result.scalars().all()
except Exception as e:
logger.error("Failed to get ingredients by category", error=str(e), category=category, tenant_id=tenant_id)
raise
async def delete_by_id(self, ingredient_id: UUID, tenant_id: UUID) -> bool:
"""Hard delete an ingredient by ID"""
try:
from sqlalchemy import delete
# Delete the ingredient
stmt = delete(self.model).where(
and_(
self.model.id == ingredient_id,
self.model.tenant_id == tenant_id
)
)
result = await self.session.execute(stmt)
await self.session.commit()
# Return True if a row was deleted
return result.rowcount > 0
except Exception as e:
await self.session.rollback()
logger.error("Failed to hard delete ingredient", error=str(e), ingredient_id=ingredient_id, tenant_id=tenant_id)
raise
async def get_active_tenants(self) -> List[UUID]:
"""Get list of active tenant IDs from ingredients table"""
try:
result = await self.session.execute(
select(func.distinct(Ingredient.tenant_id))
.where(Ingredient.is_active == True)
)
tenant_ids = []
for row in result.fetchall():
tenant_id = row[0]
# Convert to UUID if it's not already
if isinstance(tenant_id, UUID):
tenant_ids.append(tenant_id)
else:
tenant_ids.append(UUID(str(tenant_id)))
logger.info("Retrieved active tenants from ingredients", count=len(tenant_ids))
return tenant_ids
except Exception as e:
logger.error("Failed to get active tenants from ingredients", error=str(e))
return []
async def get_critical_stock_shortages(self) -> List[Dict[str, Any]]:
"""
Get critical stock shortages across all tenants using CTE analysis.
Returns ingredients that are critically low on stock.
"""
try:
from sqlalchemy import text
query = text("""
WITH stock_analysis AS (
SELECT
i.id as ingredient_id,
i.name as ingredient_name,
i.tenant_id,
i.reorder_point,
COALESCE(SUM(s.current_quantity), 0) as current_quantity,
i.low_stock_threshold,
GREATEST(0, i.low_stock_threshold - COALESCE(SUM(s.current_quantity), 0)) as shortage_amount,
CASE
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold THEN 'critical'
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold * 1.2 THEN 'low'
ELSE 'normal'
END as status
FROM ingredients i
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
WHERE i.is_active = true
GROUP BY i.id, i.name, i.tenant_id, i.reorder_point, i.low_stock_threshold
)
SELECT
ingredient_id,
ingredient_name,
tenant_id,
current_quantity,
reorder_point,
shortage_amount
FROM stock_analysis
WHERE status = 'critical'
ORDER BY shortage_amount DESC
""")
result = await self.session.execute(query)
rows = result.fetchall()
shortages = []
for row in rows:
shortages.append({
'ingredient_id': row.ingredient_id,
'ingredient_name': row.ingredient_name,
'tenant_id': row.tenant_id,
'current_quantity': float(row.current_quantity) if row.current_quantity else 0,
'required_quantity': float(row.reorder_point) if row.reorder_point else 0,
'shortage_amount': float(row.shortage_amount) if row.shortage_amount else 0
})
return shortages
except Exception as e:
logger.error("Failed to get critical stock shortages", error=str(e))
raise
async def get_stock_issues(self, tenant_id: UUID) -> List[Dict[str, Any]]:
"""
Get stock level issues with CTE analysis for a specific tenant
Returns list of critical, low, and overstock situations
"""
try:
from sqlalchemy import text
query = text("""
WITH stock_analysis AS (
SELECT
i.id, i.name, i.tenant_id,
COALESCE(SUM(s.current_quantity), 0) as current_stock,
i.low_stock_threshold as minimum_stock,
i.max_stock_level as maximum_stock,
i.reorder_point,
0 as tomorrow_needed,
0 as avg_daily_usage,
7 as lead_time_days,
CASE
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold THEN 'critical'
WHEN COALESCE(SUM(s.current_quantity), 0) < i.low_stock_threshold * 1.2 THEN 'low'
WHEN i.max_stock_level IS NOT NULL AND COALESCE(SUM(s.current_quantity), 0) > i.max_stock_level THEN 'overstock'
ELSE 'normal'
END as status,
GREATEST(0, i.low_stock_threshold - COALESCE(SUM(s.current_quantity), 0)) as shortage_amount
FROM ingredients i
LEFT JOIN stock s ON s.ingredient_id = i.id AND s.is_available = true
WHERE i.tenant_id = :tenant_id AND i.is_active = true
GROUP BY i.id, i.name, i.tenant_id, i.low_stock_threshold, i.max_stock_level, i.reorder_point
)
SELECT * FROM stock_analysis WHERE status != 'normal'
ORDER BY
CASE status
WHEN 'critical' THEN 1
WHEN 'low' THEN 2
WHEN 'overstock' THEN 3
END,
shortage_amount DESC
""")
result = await self.session.execute(query, {"tenant_id": tenant_id})
return [dict(row._mapping) for row in result.fetchall()]
except Exception as e:
logger.error("Failed to get stock issues", error=str(e), tenant_id=str(tenant_id))
raise