# services/recipes/app/repositories/recipe_repository.py """ Repository for recipe-related database operations """ from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session, joinedload from sqlalchemy import and_, or_, func from uuid import UUID from datetime import datetime from .base import BaseRepository from ..models.recipes import Recipe, RecipeIngredient, RecipeStatus class RecipeRepository(BaseRepository[Recipe]): """Repository for recipe operations""" def __init__(self, db: Session): super().__init__(Recipe, db) def get_by_id_with_ingredients(self, recipe_id: UUID) -> Optional[Recipe]: """Get recipe with ingredients loaded""" return ( self.db.query(Recipe) .options(joinedload(Recipe.ingredients)) .filter(Recipe.id == recipe_id) .first() ) def get_by_finished_product_id(self, tenant_id: UUID, finished_product_id: UUID) -> Optional[Recipe]: """Get recipe by finished product ID""" return ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.finished_product_id == finished_product_id, Recipe.status == RecipeStatus.ACTIVE ) ) .first() ) def search_recipes( self, tenant_id: UUID, search_term: Optional[str] = None, status: Optional[RecipeStatus] = None, category: Optional[str] = None, is_seasonal: Optional[bool] = None, is_signature: Optional[bool] = None, difficulty_level: Optional[int] = None, limit: int = 100, offset: int = 0 ) -> List[Recipe]: """Search recipes with multiple filters""" query = self.db.query(Recipe).filter(Recipe.tenant_id == tenant_id) # Text search if search_term: search_filter = or_( Recipe.name.ilike(f"%{search_term}%"), Recipe.description.ilike(f"%{search_term}%"), Recipe.category.ilike(f"%{search_term}%") ) query = query.filter(search_filter) # Status filter if status: query = query.filter(Recipe.status == status) # Category filter if category: query = query.filter(Recipe.category == category) # Seasonal filter if is_seasonal is not None: query = query.filter(Recipe.is_seasonal == is_seasonal) # Signature item filter if is_signature is not None: query = query.filter(Recipe.is_signature_item == is_signature) # Difficulty level filter if difficulty_level is not None: query = query.filter(Recipe.difficulty_level == difficulty_level) return query.order_by(Recipe.name).limit(limit).offset(offset).all() def get_active_recipes(self, tenant_id: UUID) -> List[Recipe]: """Get all active recipes for tenant""" return ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.status == RecipeStatus.ACTIVE ) ) .order_by(Recipe.name) .all() ) def get_seasonal_recipes(self, tenant_id: UUID, current_month: int) -> List[Recipe]: """Get seasonal recipes for current month""" return ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.status == RecipeStatus.ACTIVE, Recipe.is_seasonal == True, or_( and_( Recipe.season_start_month <= current_month, Recipe.season_end_month >= current_month ), and_( Recipe.season_start_month > Recipe.season_end_month, # Crosses year boundary or_( Recipe.season_start_month <= current_month, Recipe.season_end_month >= current_month ) ) ) ) ) .order_by(Recipe.name) .all() ) def get_recipes_by_category(self, tenant_id: UUID, category: str) -> List[Recipe]: """Get recipes by category""" return ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.category == category, Recipe.status == RecipeStatus.ACTIVE ) ) .order_by(Recipe.name) .all() ) def get_recipe_statistics(self, tenant_id: UUID) -> Dict[str, Any]: """Get recipe statistics for dashboard""" total_recipes = self.db.query(Recipe).filter(Recipe.tenant_id == tenant_id).count() active_recipes = ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.status == RecipeStatus.ACTIVE ) ) .count() ) signature_recipes = ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.is_signature_item == True, Recipe.status == RecipeStatus.ACTIVE ) ) .count() ) seasonal_recipes = ( self.db.query(Recipe) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.is_seasonal == True, Recipe.status == RecipeStatus.ACTIVE ) ) .count() ) # Category breakdown category_stats = ( self.db.query(Recipe.category, func.count(Recipe.id)) .filter( and_( Recipe.tenant_id == tenant_id, Recipe.status == RecipeStatus.ACTIVE ) ) .group_by(Recipe.category) .all() ) return { "total_recipes": total_recipes, "active_recipes": active_recipes, "signature_recipes": signature_recipes, "seasonal_recipes": seasonal_recipes, "category_breakdown": [ {"category": cat, "count": count} for cat, count in category_stats ] } def duplicate_recipe(self, recipe_id: UUID, new_name: str, created_by: UUID) -> Optional[Recipe]: """Create a duplicate of an existing recipe""" original = self.get_by_id_with_ingredients(recipe_id) if not original: return None # Create new recipe recipe_data = { "tenant_id": original.tenant_id, "name": new_name, "recipe_code": f"{original.recipe_code}_copy" if original.recipe_code else None, "version": "1.0", "finished_product_id": original.finished_product_id, "description": original.description, "category": original.category, "cuisine_type": original.cuisine_type, "difficulty_level": original.difficulty_level, "yield_quantity": original.yield_quantity, "yield_unit": original.yield_unit, "prep_time_minutes": original.prep_time_minutes, "cook_time_minutes": original.cook_time_minutes, "total_time_minutes": original.total_time_minutes, "rest_time_minutes": original.rest_time_minutes, "instructions": original.instructions, "preparation_notes": original.preparation_notes, "storage_instructions": original.storage_instructions, "quality_standards": original.quality_standards, "serves_count": original.serves_count, "nutritional_info": original.nutritional_info, "allergen_info": original.allergen_info, "dietary_tags": original.dietary_tags, "batch_size_multiplier": original.batch_size_multiplier, "minimum_batch_size": original.minimum_batch_size, "maximum_batch_size": original.maximum_batch_size, "optimal_production_temperature": original.optimal_production_temperature, "optimal_humidity": original.optimal_humidity, "quality_check_points": original.quality_check_points, "common_issues": original.common_issues, "status": RecipeStatus.DRAFT, "is_seasonal": original.is_seasonal, "season_start_month": original.season_start_month, "season_end_month": original.season_end_month, "is_signature_item": False, "created_by": created_by } new_recipe = self.create(recipe_data) # Copy ingredients for ingredient in original.ingredients: ingredient_data = { "tenant_id": original.tenant_id, "recipe_id": new_recipe.id, "ingredient_id": ingredient.ingredient_id, "quantity": ingredient.quantity, "unit": ingredient.unit, "quantity_in_base_unit": ingredient.quantity_in_base_unit, "alternative_quantity": ingredient.alternative_quantity, "alternative_unit": ingredient.alternative_unit, "preparation_method": ingredient.preparation_method, "ingredient_notes": ingredient.ingredient_notes, "is_optional": ingredient.is_optional, "ingredient_order": ingredient.ingredient_order, "ingredient_group": ingredient.ingredient_group, "substitution_options": ingredient.substitution_options, "substitution_ratio": ingredient.substitution_ratio } recipe_ingredient = RecipeIngredient(**ingredient_data) self.db.add(recipe_ingredient) self.db.commit() return new_recipe class RecipeIngredientRepository(BaseRepository[RecipeIngredient]): """Repository for recipe ingredient operations""" def __init__(self, db: Session): super().__init__(RecipeIngredient, db) def get_by_recipe_id(self, recipe_id: UUID) -> List[RecipeIngredient]: """Get all ingredients for a recipe""" return ( self.db.query(RecipeIngredient) .filter(RecipeIngredient.recipe_id == recipe_id) .order_by(RecipeIngredient.ingredient_order) .all() ) def get_by_ingredient_group(self, recipe_id: UUID, ingredient_group: str) -> List[RecipeIngredient]: """Get ingredients by group within a recipe""" return ( self.db.query(RecipeIngredient) .filter( and_( RecipeIngredient.recipe_id == recipe_id, RecipeIngredient.ingredient_group == ingredient_group ) ) .order_by(RecipeIngredient.ingredient_order) .all() ) def update_ingredients_for_recipe( self, recipe_id: UUID, ingredients_data: List[Dict[str, Any]] ) -> List[RecipeIngredient]: """Update all ingredients for a recipe""" # Delete existing ingredients self.db.query(RecipeIngredient).filter( RecipeIngredient.recipe_id == recipe_id ).delete() # Create new ingredients new_ingredients = [] for ingredient_data in ingredients_data: ingredient_data["recipe_id"] = recipe_id ingredient = RecipeIngredient(**ingredient_data) self.db.add(ingredient) new_ingredients.append(ingredient) self.db.commit() return new_ingredients def calculate_recipe_cost(self, recipe_id: UUID) -> float: """Calculate total cost of recipe based on ingredient costs""" ingredients = self.get_by_recipe_id(recipe_id) total_cost = sum( (ingredient.total_cost or 0) for ingredient in ingredients ) return total_cost