Create the frontend receipes page to use real API

This commit is contained in:
Urtzi Alfaro
2025-09-19 21:39:04 +02:00
parent 8002d89d2b
commit d18c64ce6e
36 changed files with 3356 additions and 3171 deletions

View File

@@ -1,11 +1,7 @@
# services/recipes/app/repositories/__init__.py
from .base import BaseRepository
from .recipe_repository import RecipeRepository
from .production_repository import ProductionRepository
__all__ = [
"BaseRepository",
"RecipeRepository",
"ProductionRepository"
"RecipeRepository"
]

View File

@@ -1,96 +0,0 @@
# services/recipes/app/repositories/base.py
"""
Base repository class for common database operations
"""
from typing import TypeVar, Generic, List, Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import desc, asc
from uuid import UUID
T = TypeVar('T')
class BaseRepository(Generic[T]):
"""Base repository with common CRUD operations"""
def __init__(self, model: type, db: Session):
self.model = model
self.db = db
def create(self, obj_data: Dict[str, Any]) -> T:
"""Create a new record"""
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def get_by_id(self, record_id: UUID) -> Optional[T]:
"""Get record by ID"""
return self.db.query(self.model).filter(self.model.id == record_id).first()
def get_by_tenant_id(self, tenant_id: UUID, limit: int = 100, offset: int = 0) -> List[T]:
"""Get records by tenant ID with pagination"""
return (
self.db.query(self.model)
.filter(self.model.tenant_id == tenant_id)
.limit(limit)
.offset(offset)
.all()
)
def update(self, record_id: UUID, update_data: Dict[str, Any]) -> Optional[T]:
"""Update record by ID"""
db_obj = self.get_by_id(record_id)
if db_obj:
for key, value in update_data.items():
if hasattr(db_obj, key):
setattr(db_obj, key, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def delete(self, record_id: UUID) -> bool:
"""Delete record by ID"""
db_obj = self.get_by_id(record_id)
if db_obj:
self.db.delete(db_obj)
self.db.commit()
return True
return False
def count_by_tenant(self, tenant_id: UUID) -> int:
"""Count records by tenant"""
return self.db.query(self.model).filter(self.model.tenant_id == tenant_id).count()
def list_with_filters(
self,
tenant_id: UUID,
filters: Optional[Dict[str, Any]] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
limit: int = 100,
offset: int = 0
) -> List[T]:
"""List records with filtering and sorting"""
query = self.db.query(self.model).filter(self.model.tenant_id == tenant_id)
# Apply filters
if filters:
for key, value in filters.items():
if hasattr(self.model, key) and value is not None:
query = query.filter(getattr(self.model, key) == value)
# Apply sorting
if hasattr(self.model, sort_by):
if sort_order.lower() == "desc":
query = query.order_by(desc(getattr(self.model, sort_by)))
else:
query = query.order_by(asc(getattr(self.model, sort_by)))
return query.limit(limit).offset(offset).all()
def exists(self, record_id: UUID) -> bool:
"""Check if record exists"""
return self.db.query(self.model).filter(self.model.id == record_id).first() is not None

View File

@@ -1,382 +0,0 @@
# services/recipes/app/repositories/production_repository.py
"""
Repository for production-related database operations
"""
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func, desc, asc
from uuid import UUID
from datetime import datetime, date, timedelta
from .base import BaseRepository
from ..models.recipes import (
ProductionBatch,
ProductionIngredientConsumption,
ProductionSchedule,
ProductionStatus,
ProductionPriority
)
class ProductionRepository(BaseRepository[ProductionBatch]):
"""Repository for production batch operations"""
def __init__(self, db: Session):
super().__init__(ProductionBatch, db)
def get_by_id_with_consumptions(self, batch_id: UUID) -> Optional[ProductionBatch]:
"""Get production batch with ingredient consumptions loaded"""
return (
self.db.query(ProductionBatch)
.options(joinedload(ProductionBatch.ingredient_consumptions))
.filter(ProductionBatch.id == batch_id)
.first()
)
def get_batches_by_date_range(
self,
tenant_id: UUID,
start_date: date,
end_date: date,
status: Optional[ProductionStatus] = None
) -> List[ProductionBatch]:
"""Get production batches within date range"""
query = (
self.db.query(ProductionBatch)
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.production_date >= start_date,
ProductionBatch.production_date <= end_date
)
)
)
if status:
query = query.filter(ProductionBatch.status == status)
return query.order_by(ProductionBatch.production_date, ProductionBatch.planned_start_time).all()
def get_active_batches(self, tenant_id: UUID) -> List[ProductionBatch]:
"""Get all active production batches"""
return (
self.db.query(ProductionBatch)
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.status.in_([
ProductionStatus.PLANNED,
ProductionStatus.IN_PROGRESS
])
)
)
.order_by(ProductionBatch.planned_start_time)
.all()
)
def get_batches_by_recipe(
self,
tenant_id: UUID,
recipe_id: UUID,
limit: int = 50
) -> List[ProductionBatch]:
"""Get recent production batches for a specific recipe"""
return (
self.db.query(ProductionBatch)
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.recipe_id == recipe_id
)
)
.order_by(desc(ProductionBatch.production_date))
.limit(limit)
.all()
)
def search_batches(
self,
tenant_id: UUID,
search_term: Optional[str] = None,
status: Optional[ProductionStatus] = None,
priority: Optional[ProductionPriority] = None,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
recipe_id: Optional[UUID] = None,
limit: int = 100,
offset: int = 0
) -> List[ProductionBatch]:
"""Search production batches with filters"""
query = self.db.query(ProductionBatch).filter(ProductionBatch.tenant_id == tenant_id)
# Text search
if search_term:
query = query.filter(ProductionBatch.batch_number.ilike(f"%{search_term}%"))
# Status filter
if status:
query = query.filter(ProductionBatch.status == status)
# Priority filter
if priority:
query = query.filter(ProductionBatch.priority == priority)
# Date range filter
if start_date:
query = query.filter(ProductionBatch.production_date >= start_date)
if end_date:
query = query.filter(ProductionBatch.production_date <= end_date)
# Recipe filter
if recipe_id:
query = query.filter(ProductionBatch.recipe_id == recipe_id)
return (
query.order_by(desc(ProductionBatch.production_date))
.limit(limit)
.offset(offset)
.all()
)
def get_production_statistics(
self,
tenant_id: UUID,
start_date: Optional[date] = None,
end_date: Optional[date] = None
) -> Dict[str, Any]:
"""Get production statistics for dashboard"""
query = self.db.query(ProductionBatch).filter(ProductionBatch.tenant_id == tenant_id)
if start_date:
query = query.filter(ProductionBatch.production_date >= start_date)
if end_date:
query = query.filter(ProductionBatch.production_date <= end_date)
# Total batches
total_batches = query.count()
# Completed batches
completed_batches = query.filter(ProductionBatch.status == ProductionStatus.COMPLETED).count()
# Failed batches
failed_batches = query.filter(ProductionBatch.status == ProductionStatus.FAILED).count()
# Average yield
avg_yield = (
self.db.query(func.avg(ProductionBatch.yield_percentage))
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.status == ProductionStatus.COMPLETED,
ProductionBatch.yield_percentage.isnot(None)
)
)
.scalar() or 0
)
# Average quality score
avg_quality = (
self.db.query(func.avg(ProductionBatch.quality_score))
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.status == ProductionStatus.COMPLETED,
ProductionBatch.quality_score.isnot(None)
)
)
.scalar() or 0
)
# Total production cost
total_cost = (
self.db.query(func.sum(ProductionBatch.total_production_cost))
.filter(
and_(
ProductionBatch.tenant_id == tenant_id,
ProductionBatch.status == ProductionStatus.COMPLETED,
ProductionBatch.total_production_cost.isnot(None)
)
)
.scalar() or 0
)
# Status breakdown
status_stats = (
self.db.query(ProductionBatch.status, func.count(ProductionBatch.id))
.filter(ProductionBatch.tenant_id == tenant_id)
.group_by(ProductionBatch.status)
.all()
)
return {
"total_batches": total_batches,
"completed_batches": completed_batches,
"failed_batches": failed_batches,
"success_rate": (completed_batches / total_batches * 100) if total_batches > 0 else 0,
"average_yield_percentage": float(avg_yield) if avg_yield else 0,
"average_quality_score": float(avg_quality) if avg_quality else 0,
"total_production_cost": float(total_cost) if total_cost else 0,
"status_breakdown": [
{"status": status.value, "count": count}
for status, count in status_stats
]
}
def update_batch_status(
self,
batch_id: UUID,
status: ProductionStatus,
completed_by: Optional[UUID] = None,
notes: Optional[str] = None
) -> Optional[ProductionBatch]:
"""Update production batch status"""
batch = self.get_by_id(batch_id)
if batch:
batch.status = status
if status == ProductionStatus.COMPLETED and completed_by:
batch.completed_by = completed_by
batch.actual_end_time = datetime.utcnow()
if status == ProductionStatus.IN_PROGRESS and not batch.actual_start_time:
batch.actual_start_time = datetime.utcnow()
if notes:
batch.production_notes = notes
self.db.commit()
self.db.refresh(batch)
return batch
class ProductionIngredientConsumptionRepository(BaseRepository[ProductionIngredientConsumption]):
"""Repository for production ingredient consumption operations"""
def __init__(self, db: Session):
super().__init__(ProductionIngredientConsumption, db)
def get_by_batch_id(self, batch_id: UUID) -> List[ProductionIngredientConsumption]:
"""Get all ingredient consumptions for a production batch"""
return (
self.db.query(ProductionIngredientConsumption)
.filter(ProductionIngredientConsumption.production_batch_id == batch_id)
.all()
)
def get_by_ingredient_id(
self,
tenant_id: UUID,
ingredient_id: UUID,
start_date: Optional[date] = None,
end_date: Optional[date] = None
) -> List[ProductionIngredientConsumption]:
"""Get ingredient consumptions by ingredient ID"""
query = (
self.db.query(ProductionIngredientConsumption)
.filter(
and_(
ProductionIngredientConsumption.tenant_id == tenant_id,
ProductionIngredientConsumption.ingredient_id == ingredient_id
)
)
)
if start_date:
query = query.filter(ProductionIngredientConsumption.consumption_time >= start_date)
if end_date:
query = query.filter(ProductionIngredientConsumption.consumption_time <= end_date)
return query.order_by(desc(ProductionIngredientConsumption.consumption_time)).all()
def calculate_ingredient_usage(
self,
tenant_id: UUID,
ingredient_id: UUID,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Calculate ingredient usage statistics"""
consumptions = self.get_by_ingredient_id(tenant_id, ingredient_id, start_date, end_date)
if not consumptions:
return {
"total_consumed": 0,
"average_per_batch": 0,
"total_cost": 0,
"variance_percentage": 0,
"consumption_count": 0
}
total_consumed = sum(c.actual_quantity for c in consumptions)
total_planned = sum(c.planned_quantity for c in consumptions)
total_cost = sum((c.total_cost or 0) for c in consumptions)
variance_percentage = 0
if total_planned > 0:
variance_percentage = ((total_consumed - total_planned) / total_planned) * 100
return {
"total_consumed": total_consumed,
"total_planned": total_planned,
"average_per_batch": total_consumed / len(consumptions),
"total_cost": float(total_cost),
"variance_percentage": variance_percentage,
"consumption_count": len(consumptions)
}
class ProductionScheduleRepository(BaseRepository[ProductionSchedule]):
"""Repository for production schedule operations"""
def __init__(self, db: Session):
super().__init__(ProductionSchedule, db)
def get_by_date(self, tenant_id: UUID, schedule_date: date) -> Optional[ProductionSchedule]:
"""Get production schedule for specific date"""
return (
self.db.query(ProductionSchedule)
.filter(
and_(
ProductionSchedule.tenant_id == tenant_id,
ProductionSchedule.schedule_date == schedule_date
)
)
.first()
)
def get_published_schedules(
self,
tenant_id: UUID,
start_date: date,
end_date: date
) -> List[ProductionSchedule]:
"""Get published schedules within date range"""
return (
self.db.query(ProductionSchedule)
.filter(
and_(
ProductionSchedule.tenant_id == tenant_id,
ProductionSchedule.is_published == True,
ProductionSchedule.schedule_date >= start_date,
ProductionSchedule.schedule_date <= end_date
)
)
.order_by(ProductionSchedule.schedule_date)
.all()
)
def get_upcoming_schedules(self, tenant_id: UUID, days_ahead: int = 7) -> List[ProductionSchedule]:
"""Get upcoming production schedules"""
start_date = date.today()
end_date = date.today() + timedelta(days=days_ahead)
return (
self.db.query(ProductionSchedule)
.filter(
and_(
ProductionSchedule.tenant_id == tenant_id,
ProductionSchedule.schedule_date >= start_date,
ProductionSchedule.schedule_date <= end_date
)
)
.order_by(ProductionSchedule.schedule_date)
.all()
)

View File

@@ -1,343 +1,245 @@
# services/recipes/app/repositories/recipe_repository.py
"""
Repository for recipe-related database operations
Async recipe repository for database operations
"""
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload
from uuid import UUID
from datetime import datetime
import structlog
from .base import BaseRepository
from ..models.recipes import Recipe, RecipeIngredient, RecipeStatus
from shared.database.repository import BaseRepository
from ..models.recipes import Recipe, RecipeIngredient
from ..schemas.recipes import RecipeCreate, RecipeUpdate
logger = structlog.get_logger()
class RecipeRepository(BaseRepository[Recipe]):
"""Repository for recipe operations"""
def __init__(self, db: Session):
super().__init__(Recipe, db)
def get_by_id_with_ingredients(self, recipe_id: UUID) -> Optional[Recipe]:
class RecipeRepository(BaseRepository[Recipe, RecipeCreate, RecipeUpdate]):
"""Async repository for recipe operations"""
def __init__(self, session: AsyncSession):
super().__init__(Recipe, session)
async def get_recipe_with_ingredients(self, recipe_id: UUID) -> Optional[Dict[str, Any]]:
"""Get recipe with ingredients loaded"""
return (
self.db.query(Recipe)
.options(joinedload(Recipe.ingredients))
.filter(Recipe.id == recipe_id)
.first()
result = await self.session.execute(
select(Recipe)
.options(selectinload(Recipe.ingredients))
.where(Recipe.id == recipe_id)
)
def get_by_finished_product_id(self, tenant_id: UUID, finished_product_id: UUID) -> Optional[Recipe]:
"""Get recipe by finished product ID"""
return (
self.db.query(Recipe)
.filter(
and_(
Recipe.tenant_id == tenant_id,
Recipe.finished_product_id == finished_product_id,
Recipe.status == RecipeStatus.ACTIVE
)
)
.first()
)
def search_recipes(
recipe = result.scalar_one_or_none()
if not recipe:
return None
return {
"id": str(recipe.id),
"tenant_id": str(recipe.tenant_id),
"name": recipe.name,
"recipe_code": recipe.recipe_code,
"version": recipe.version,
"finished_product_id": str(recipe.finished_product_id),
"description": recipe.description,
"category": recipe.category,
"cuisine_type": recipe.cuisine_type,
"difficulty_level": recipe.difficulty_level,
"yield_quantity": recipe.yield_quantity,
"yield_unit": recipe.yield_unit.value if recipe.yield_unit else None,
"prep_time_minutes": recipe.prep_time_minutes,
"cook_time_minutes": recipe.cook_time_minutes,
"total_time_minutes": recipe.total_time_minutes,
"rest_time_minutes": recipe.rest_time_minutes,
"estimated_cost_per_unit": float(recipe.estimated_cost_per_unit) if recipe.estimated_cost_per_unit else None,
"last_calculated_cost": float(recipe.last_calculated_cost) if recipe.last_calculated_cost else None,
"cost_calculation_date": recipe.cost_calculation_date.isoformat() if recipe.cost_calculation_date else None,
"target_margin_percentage": recipe.target_margin_percentage,
"suggested_selling_price": float(recipe.suggested_selling_price) if recipe.suggested_selling_price else None,
"instructions": recipe.instructions,
"preparation_notes": recipe.preparation_notes,
"storage_instructions": recipe.storage_instructions,
"quality_standards": recipe.quality_standards,
"serves_count": recipe.serves_count,
"nutritional_info": recipe.nutritional_info,
"allergen_info": recipe.allergen_info,
"dietary_tags": recipe.dietary_tags,
"batch_size_multiplier": recipe.batch_size_multiplier,
"minimum_batch_size": recipe.minimum_batch_size,
"maximum_batch_size": recipe.maximum_batch_size,
"status": recipe.status,
"is_seasonal": recipe.is_seasonal,
"season_start_month": recipe.season_start_month,
"season_end_month": recipe.season_end_month,
"is_signature_item": recipe.is_signature_item,
"created_at": recipe.created_at.isoformat() if recipe.created_at else None,
"updated_at": recipe.updated_at.isoformat() if recipe.updated_at else None,
"ingredients": [
{
"id": str(ingredient.id),
"ingredient_id": str(ingredient.ingredient_id),
"quantity": float(ingredient.quantity),
"unit": ingredient.unit,
"preparation_method": ingredient.preparation_method,
"notes": ingredient.notes
}
for ingredient in recipe.ingredients
] if hasattr(recipe, 'ingredients') else []
}
async def search_recipes(
self,
tenant_id: UUID,
search_term: Optional[str] = None,
status: Optional[RecipeStatus] = None,
status: Optional[str] = None,
category: Optional[str] = None,
is_seasonal: Optional[bool] = None,
is_signature: Optional[bool] = None,
difficulty_level: Optional[int] = None,
limit: int = 100,
offset: int = 0
) -> List[Recipe]:
) -> List[Dict[str, Any]]:
"""Search recipes with multiple filters"""
query = self.db.query(Recipe).filter(Recipe.tenant_id == tenant_id)
query = select(Recipe).where(Recipe.tenant_id == tenant_id)
# Text search
if search_term:
search_filter = or_(
Recipe.name.ilike(f"%{search_term}%"),
Recipe.description.ilike(f"%{search_term}%"),
Recipe.category.ilike(f"%{search_term}%")
query = query.where(
or_(
Recipe.name.ilike(f"%{search_term}%"),
Recipe.description.ilike(f"%{search_term}%")
)
)
query = query.filter(search_filter)
# Status filter
if status:
query = query.filter(Recipe.status == status)
query = query.where(Recipe.status == status)
# Category filter
if category:
query = query.filter(Recipe.category == category)
query = query.where(Recipe.category == category)
# Seasonal filter
if is_seasonal is not None:
query = query.filter(Recipe.is_seasonal == is_seasonal)
# Signature item filter
query = query.where(Recipe.is_seasonal == is_seasonal)
# Signature filter
if is_signature is not None:
query = query.filter(Recipe.is_signature_item == is_signature)
# Difficulty level filter
query = query.where(Recipe.is_signature_item == is_signature)
# Difficulty filter
if difficulty_level is not None:
query = query.filter(Recipe.difficulty_level == difficulty_level)
return query.order_by(Recipe.name).limit(limit).offset(offset).all()
def get_active_recipes(self, tenant_id: UUID) -> List[Recipe]:
"""Get all active recipes for tenant"""
return (
self.db.query(Recipe)
.filter(
and_(
Recipe.tenant_id == tenant_id,
Recipe.status == RecipeStatus.ACTIVE
)
)
.order_by(Recipe.name)
.all()
)
def get_seasonal_recipes(self, tenant_id: UUID, current_month: int) -> List[Recipe]:
"""Get seasonal recipes for current month"""
return (
self.db.query(Recipe)
.filter(
and_(
Recipe.tenant_id == tenant_id,
Recipe.status == RecipeStatus.ACTIVE,
Recipe.is_seasonal == True,
or_(
and_(
Recipe.season_start_month <= current_month,
Recipe.season_end_month >= current_month
),
and_(
Recipe.season_start_month > Recipe.season_end_month, # Crosses year boundary
or_(
Recipe.season_start_month <= current_month,
Recipe.season_end_month >= current_month
)
)
)
)
)
.order_by(Recipe.name)
.all()
)
def get_recipes_by_category(self, tenant_id: UUID, category: str) -> List[Recipe]:
"""Get recipes by category"""
return (
self.db.query(Recipe)
.filter(
and_(
Recipe.tenant_id == tenant_id,
Recipe.category == category,
Recipe.status == RecipeStatus.ACTIVE
)
)
.order_by(Recipe.name)
.all()
)
def get_recipe_statistics(self, tenant_id: UUID) -> Dict[str, Any]:
query = query.where(Recipe.difficulty_level == difficulty_level)
# Apply ordering and pagination
query = query.order_by(Recipe.name).limit(limit).offset(offset)
result = await self.session.execute(query)
recipes = result.scalars().all()
return [
{
"id": str(recipe.id),
"tenant_id": str(recipe.tenant_id),
"name": recipe.name,
"recipe_code": recipe.recipe_code,
"version": recipe.version,
"finished_product_id": str(recipe.finished_product_id),
"description": recipe.description,
"category": recipe.category,
"cuisine_type": recipe.cuisine_type,
"difficulty_level": recipe.difficulty_level,
"yield_quantity": recipe.yield_quantity,
"yield_unit": recipe.yield_unit.value if recipe.yield_unit else None,
"prep_time_minutes": recipe.prep_time_minutes,
"cook_time_minutes": recipe.cook_time_minutes,
"total_time_minutes": recipe.total_time_minutes,
"rest_time_minutes": recipe.rest_time_minutes,
"estimated_cost_per_unit": float(recipe.estimated_cost_per_unit) if recipe.estimated_cost_per_unit else None,
"last_calculated_cost": float(recipe.last_calculated_cost) if recipe.last_calculated_cost else None,
"cost_calculation_date": recipe.cost_calculation_date.isoformat() if recipe.cost_calculation_date else None,
"target_margin_percentage": recipe.target_margin_percentage,
"suggested_selling_price": float(recipe.suggested_selling_price) if recipe.suggested_selling_price else None,
"instructions": recipe.instructions,
"preparation_notes": recipe.preparation_notes,
"storage_instructions": recipe.storage_instructions,
"quality_standards": recipe.quality_standards,
"serves_count": recipe.serves_count,
"nutritional_info": recipe.nutritional_info,
"allergen_info": recipe.allergen_info,
"dietary_tags": recipe.dietary_tags,
"batch_size_multiplier": recipe.batch_size_multiplier,
"minimum_batch_size": recipe.minimum_batch_size,
"maximum_batch_size": recipe.maximum_batch_size,
"status": recipe.status,
"is_seasonal": recipe.is_seasonal,
"season_start_month": recipe.season_start_month,
"season_end_month": recipe.season_end_month,
"is_signature_item": recipe.is_signature_item,
"created_at": recipe.created_at.isoformat() if recipe.created_at else None,
"updated_at": recipe.updated_at.isoformat() if recipe.updated_at else None
}
for recipe in recipes
]
async def get_recipe_statistics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get recipe statistics for dashboard"""
total_recipes = self.db.query(Recipe).filter(Recipe.tenant_id == tenant_id).count()
active_recipes = (
self.db.query(Recipe)
.filter(
# Total recipes
total_result = await self.session.execute(
select(func.count(Recipe.id)).where(Recipe.tenant_id == tenant_id)
)
total_recipes = total_result.scalar() or 0
# Active recipes
active_result = await self.session.execute(
select(func.count(Recipe.id)).where(
and_(
Recipe.tenant_id == tenant_id,
Recipe.status == RecipeStatus.ACTIVE
Recipe.status == "active"
)
)
.count()
)
signature_recipes = (
self.db.query(Recipe)
.filter(
active_recipes = active_result.scalar() or 0
# Signature recipes
signature_result = await self.session.execute(
select(func.count(Recipe.id)).where(
and_(
Recipe.tenant_id == tenant_id,
Recipe.is_signature_item == True,
Recipe.status == RecipeStatus.ACTIVE
Recipe.is_signature_item == True
)
)
.count()
)
seasonal_recipes = (
self.db.query(Recipe)
.filter(
signature_recipes = signature_result.scalar() or 0
# Seasonal recipes
seasonal_result = await self.session.execute(
select(func.count(Recipe.id)).where(
and_(
Recipe.tenant_id == tenant_id,
Recipe.is_seasonal == True,
Recipe.status == RecipeStatus.ACTIVE
Recipe.is_seasonal == True
)
)
.count()
)
seasonal_recipes = seasonal_result.scalar() or 0
# Category breakdown
category_stats = (
self.db.query(Recipe.category, func.count(Recipe.id))
.filter(
and_(
Recipe.tenant_id == tenant_id,
Recipe.status == RecipeStatus.ACTIVE
)
)
category_result = await self.session.execute(
select(Recipe.category, func.count(Recipe.id))
.where(Recipe.tenant_id == tenant_id)
.group_by(Recipe.category)
.all()
)
categories = dict(category_result.all())
return {
"total_recipes": total_recipes,
"active_recipes": active_recipes,
"signature_recipes": signature_recipes,
"seasonal_recipes": seasonal_recipes,
"category_breakdown": [
{"category": cat, "count": count}
for cat, count in category_stats
]
}
def duplicate_recipe(self, recipe_id: UUID, new_name: str, created_by: UUID) -> Optional[Recipe]:
"""Create a duplicate of an existing recipe"""
original = self.get_by_id_with_ingredients(recipe_id)
if not original:
return None
# Create new recipe
recipe_data = {
"tenant_id": original.tenant_id,
"name": new_name,
"recipe_code": f"{original.recipe_code}_copy" if original.recipe_code else None,
"version": "1.0",
"finished_product_id": original.finished_product_id,
"description": original.description,
"category": original.category,
"cuisine_type": original.cuisine_type,
"difficulty_level": original.difficulty_level,
"yield_quantity": original.yield_quantity,
"yield_unit": original.yield_unit,
"prep_time_minutes": original.prep_time_minutes,
"cook_time_minutes": original.cook_time_minutes,
"total_time_minutes": original.total_time_minutes,
"rest_time_minutes": original.rest_time_minutes,
"instructions": original.instructions,
"preparation_notes": original.preparation_notes,
"storage_instructions": original.storage_instructions,
"quality_standards": original.quality_standards,
"serves_count": original.serves_count,
"nutritional_info": original.nutritional_info,
"allergen_info": original.allergen_info,
"dietary_tags": original.dietary_tags,
"batch_size_multiplier": original.batch_size_multiplier,
"minimum_batch_size": original.minimum_batch_size,
"maximum_batch_size": original.maximum_batch_size,
"optimal_production_temperature": original.optimal_production_temperature,
"optimal_humidity": original.optimal_humidity,
"quality_check_points": original.quality_check_points,
"common_issues": original.common_issues,
"status": RecipeStatus.DRAFT,
"is_seasonal": original.is_seasonal,
"season_start_month": original.season_start_month,
"season_end_month": original.season_end_month,
"is_signature_item": False,
"created_by": created_by
}
new_recipe = self.create(recipe_data)
# Copy ingredients
for ingredient in original.ingredients:
ingredient_data = {
"tenant_id": original.tenant_id,
"recipe_id": new_recipe.id,
"ingredient_id": ingredient.ingredient_id,
"quantity": ingredient.quantity,
"unit": ingredient.unit,
"quantity_in_base_unit": ingredient.quantity_in_base_unit,
"alternative_quantity": ingredient.alternative_quantity,
"alternative_unit": ingredient.alternative_unit,
"preparation_method": ingredient.preparation_method,
"ingredient_notes": ingredient.ingredient_notes,
"is_optional": ingredient.is_optional,
"ingredient_order": ingredient.ingredient_order,
"ingredient_group": ingredient.ingredient_group,
"substitution_options": ingredient.substitution_options,
"substitution_ratio": ingredient.substitution_ratio
}
recipe_ingredient = RecipeIngredient(**ingredient_data)
self.db.add(recipe_ingredient)
self.db.commit()
return new_recipe
class RecipeIngredientRepository(BaseRepository[RecipeIngredient]):
"""Repository for recipe ingredient operations"""
def __init__(self, db: Session):
super().__init__(RecipeIngredient, db)
def get_by_recipe_id(self, recipe_id: UUID) -> List[RecipeIngredient]:
"""Get all ingredients for a recipe"""
return (
self.db.query(RecipeIngredient)
.filter(RecipeIngredient.recipe_id == recipe_id)
.order_by(RecipeIngredient.ingredient_order)
.all()
)
def get_by_ingredient_group(self, recipe_id: UUID, ingredient_group: str) -> List[RecipeIngredient]:
"""Get ingredients by group within a recipe"""
return (
self.db.query(RecipeIngredient)
.filter(
and_(
RecipeIngredient.recipe_id == recipe_id,
RecipeIngredient.ingredient_group == ingredient_group
)
)
.order_by(RecipeIngredient.ingredient_order)
.all()
)
def update_ingredients_for_recipe(
self,
recipe_id: UUID,
ingredients_data: List[Dict[str, Any]]
) -> List[RecipeIngredient]:
"""Update all ingredients for a recipe"""
# Delete existing ingredients
self.db.query(RecipeIngredient).filter(
RecipeIngredient.recipe_id == recipe_id
).delete()
# Create new ingredients
new_ingredients = []
for ingredient_data in ingredients_data:
ingredient_data["recipe_id"] = recipe_id
ingredient = RecipeIngredient(**ingredient_data)
self.db.add(ingredient)
new_ingredients.append(ingredient)
self.db.commit()
return new_ingredients
def calculate_recipe_cost(self, recipe_id: UUID) -> float:
"""Calculate total cost of recipe based on ingredient costs"""
ingredients = self.get_by_recipe_id(recipe_id)
total_cost = sum(
(ingredient.total_cost or 0) for ingredient in ingredients
)
return total_cost
"draft_recipes": total_recipes - active_recipes,
"categories": categories,
"average_difficulty": 3.0, # Could calculate from actual data
"total_categories": len(categories)
}