Create new services: inventory, recipes, suppliers

This commit is contained in:
Urtzi Alfaro
2025-08-13 17:39:35 +02:00
parent fbe7470ad9
commit 16b8a9d50c
151 changed files with 35799 additions and 857 deletions

View File

@@ -0,0 +1,469 @@
# services/inventory/app/services/inventory_service.py
"""
Inventory Service - Business Logic Layer
"""
from typing import List, Optional, Dict, Any, Tuple
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from app.models.inventory import Ingredient, Stock, StockMovement, StockAlert, StockMovementType
from app.repositories.ingredient_repository import IngredientRepository
from app.repositories.stock_repository import StockRepository
from app.repositories.stock_movement_repository import StockMovementRepository
from app.schemas.inventory import (
IngredientCreate, IngredientUpdate, IngredientResponse,
StockCreate, StockUpdate, StockResponse,
StockMovementCreate, StockMovementResponse,
InventorySummary, StockLevelSummary
)
from app.core.database import get_db_transaction
from shared.database.exceptions import DatabaseError
logger = structlog.get_logger()
class InventoryService:
"""Service layer for inventory operations"""
def __init__(self):
pass
# ===== INGREDIENT MANAGEMENT =====
async def create_ingredient(
self,
ingredient_data: IngredientCreate,
tenant_id: UUID,
user_id: Optional[UUID] = None
) -> IngredientResponse:
"""Create a new ingredient with business validation"""
try:
# Business validation
await self._validate_ingredient_data(ingredient_data, tenant_id)
async with get_db_transaction() as db:
repository = IngredientRepository(db)
# Check for duplicates
if ingredient_data.sku:
existing = await repository.get_by_sku(tenant_id, ingredient_data.sku)
if existing:
raise ValueError(f"Ingredient with SKU '{ingredient_data.sku}' already exists")
if ingredient_data.barcode:
existing = await repository.get_by_barcode(tenant_id, ingredient_data.barcode)
if existing:
raise ValueError(f"Ingredient with barcode '{ingredient_data.barcode}' already exists")
# Create ingredient
ingredient = await repository.create_ingredient(ingredient_data, tenant_id)
# Convert to response schema
response = IngredientResponse(**ingredient.to_dict())
# Add computed fields
response.current_stock = 0.0
response.is_low_stock = True
response.needs_reorder = True
logger.info("Ingredient created successfully", ingredient_id=ingredient.id, name=ingredient.name)
return response
except Exception as e:
logger.error("Failed to create ingredient", error=str(e), tenant_id=tenant_id)
raise
async def get_ingredient(self, ingredient_id: UUID, tenant_id: UUID) -> Optional[IngredientResponse]:
"""Get ingredient by ID with stock information"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
ingredient = await ingredient_repo.get_by_id(ingredient_id)
if not ingredient or ingredient.tenant_id != tenant_id:
return None
# Get current stock levels
stock_totals = await stock_repo.get_total_stock_by_ingredient(tenant_id, ingredient_id)
# Convert to response schema
response = IngredientResponse(**ingredient.to_dict())
response.current_stock = stock_totals['total_available']
response.is_low_stock = stock_totals['total_available'] <= ingredient.low_stock_threshold
response.needs_reorder = stock_totals['total_available'] <= ingredient.reorder_point
return response
except Exception as e:
logger.error("Failed to get ingredient", error=str(e), ingredient_id=ingredient_id)
raise
async def update_ingredient(
self,
ingredient_id: UUID,
ingredient_data: IngredientUpdate,
tenant_id: UUID
) -> Optional[IngredientResponse]:
"""Update ingredient with business validation"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
# Check if ingredient exists and belongs to tenant
existing = await ingredient_repo.get_by_id(ingredient_id)
if not existing or existing.tenant_id != tenant_id:
return None
# Validate unique constraints
if ingredient_data.sku and ingredient_data.sku != existing.sku:
sku_check = await ingredient_repo.get_by_sku(tenant_id, ingredient_data.sku)
if sku_check and sku_check.id != ingredient_id:
raise ValueError(f"Ingredient with SKU '{ingredient_data.sku}' already exists")
if ingredient_data.barcode and ingredient_data.barcode != existing.barcode:
barcode_check = await ingredient_repo.get_by_barcode(tenant_id, ingredient_data.barcode)
if barcode_check and barcode_check.id != ingredient_id:
raise ValueError(f"Ingredient with barcode '{ingredient_data.barcode}' already exists")
# Update ingredient
updated_ingredient = await ingredient_repo.update(ingredient_id, ingredient_data)
if not updated_ingredient:
return None
# Get current stock levels
stock_totals = await stock_repo.get_total_stock_by_ingredient(tenant_id, ingredient_id)
# Convert to response schema
response = IngredientResponse(**updated_ingredient.to_dict())
response.current_stock = stock_totals['total_available']
response.is_low_stock = stock_totals['total_available'] <= updated_ingredient.low_stock_threshold
response.needs_reorder = stock_totals['total_available'] <= updated_ingredient.reorder_point
return response
except Exception as e:
logger.error("Failed to update ingredient", error=str(e), ingredient_id=ingredient_id)
raise
async def get_ingredients(
self,
tenant_id: UUID,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[IngredientResponse]:
"""Get ingredients with filtering and stock information"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
# Get ingredients
ingredients = await ingredient_repo.get_ingredients_by_tenant(
tenant_id, skip, limit, filters
)
responses = []
for ingredient in ingredients:
# Get current stock levels
stock_totals = await stock_repo.get_total_stock_by_ingredient(tenant_id, ingredient.id)
# Convert to response schema
response = IngredientResponse(**ingredient.to_dict())
response.current_stock = stock_totals['total_available']
response.is_low_stock = stock_totals['total_available'] <= ingredient.low_stock_threshold
response.needs_reorder = stock_totals['total_available'] <= ingredient.reorder_point
responses.append(response)
return responses
except Exception as e:
logger.error("Failed to get ingredients", error=str(e), tenant_id=tenant_id)
raise
# ===== STOCK MANAGEMENT =====
async def add_stock(
self,
stock_data: StockCreate,
tenant_id: UUID,
user_id: Optional[UUID] = None
) -> StockResponse:
"""Add new stock with automatic movement tracking"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
movement_repo = StockMovementRepository(db)
# Validate ingredient exists
ingredient = await ingredient_repo.get_by_id(UUID(stock_data.ingredient_id))
if not ingredient or ingredient.tenant_id != tenant_id:
raise ValueError("Ingredient not found")
# Create stock entry
stock = await stock_repo.create_stock_entry(stock_data, tenant_id)
# Create stock movement record
movement_data = StockMovementCreate(
ingredient_id=stock_data.ingredient_id,
stock_id=str(stock.id),
movement_type=StockMovementType.PURCHASE,
quantity=stock_data.current_quantity,
unit_cost=stock_data.unit_cost,
notes=f"Initial stock entry - Batch: {stock_data.batch_number or 'N/A'}"
)
await movement_repo.create_movement(movement_data, tenant_id, user_id)
# Update ingredient's last purchase price
if stock_data.unit_cost:
await ingredient_repo.update_last_purchase_price(
UUID(stock_data.ingredient_id),
float(stock_data.unit_cost)
)
# Convert to response schema
response = StockResponse(**stock.to_dict())
response.ingredient = IngredientResponse(**ingredient.to_dict())
logger.info("Stock added successfully", stock_id=stock.id, quantity=stock.current_quantity)
return response
except Exception as e:
logger.error("Failed to add stock", error=str(e), tenant_id=tenant_id)
raise
async def consume_stock(
self,
ingredient_id: UUID,
quantity: float,
tenant_id: UUID,
user_id: Optional[UUID] = None,
reference_number: Optional[str] = None,
notes: Optional[str] = None,
fifo: bool = True
) -> List[Dict[str, Any]]:
"""Consume stock using FIFO/LIFO with movement tracking"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
movement_repo = StockMovementRepository(db)
# Validate ingredient
ingredient = await ingredient_repo.get_by_id(ingredient_id)
if not ingredient or ingredient.tenant_id != tenant_id:
raise ValueError("Ingredient not found")
# Reserve stock first
reservations = await stock_repo.reserve_stock(tenant_id, ingredient_id, quantity, fifo)
if not reservations:
raise ValueError("Insufficient stock available")
consumed_items = []
for reservation in reservations:
stock_id = UUID(reservation['stock_id'])
reserved_qty = reservation['reserved_quantity']
# Consume from reserved stock
consumed_stock = await stock_repo.consume_stock(stock_id, reserved_qty, from_reserved=True)
# Create movement record
movement_data = StockMovementCreate(
ingredient_id=str(ingredient_id),
stock_id=str(stock_id),
movement_type=StockMovementType.PRODUCTION_USE,
quantity=reserved_qty,
reference_number=reference_number,
notes=notes or f"Stock consumption - Batch: {reservation.get('batch_number', 'N/A')}"
)
await movement_repo.create_movement(movement_data, tenant_id, user_id)
consumed_items.append({
'stock_id': str(stock_id),
'quantity_consumed': reserved_qty,
'batch_number': reservation.get('batch_number'),
'expiration_date': reservation.get('expiration_date')
})
logger.info(
"Stock consumed successfully",
ingredient_id=ingredient_id,
total_quantity=quantity,
items_consumed=len(consumed_items)
)
return consumed_items
except Exception as e:
logger.error("Failed to consume stock", error=str(e), ingredient_id=ingredient_id)
raise
async def get_stock_by_ingredient(
self,
ingredient_id: UUID,
tenant_id: UUID,
include_unavailable: bool = False
) -> List[StockResponse]:
"""Get all stock entries for an ingredient"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
# Validate ingredient
ingredient = await ingredient_repo.get_by_id(ingredient_id)
if not ingredient or ingredient.tenant_id != tenant_id:
return []
# Get stock entries
stock_entries = await stock_repo.get_stock_by_ingredient(
tenant_id, ingredient_id, include_unavailable
)
responses = []
for stock in stock_entries:
response = StockResponse(**stock.to_dict())
response.ingredient = IngredientResponse(**ingredient.to_dict())
responses.append(response)
return responses
except Exception as e:
logger.error("Failed to get stock by ingredient", error=str(e), ingredient_id=ingredient_id)
raise
# ===== ALERTS AND NOTIFICATIONS =====
async def check_low_stock_alerts(self, tenant_id: UUID) -> List[Dict[str, Any]]:
"""Check for ingredients with low stock levels"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
low_stock_items = await ingredient_repo.get_low_stock_ingredients(tenant_id)
alerts = []
for item in low_stock_items:
ingredient = item['ingredient']
alerts.append({
'ingredient_id': str(ingredient.id),
'ingredient_name': ingredient.name,
'current_stock': item['current_stock'],
'threshold': item['threshold'],
'needs_reorder': item['needs_reorder'],
'alert_type': 'reorder_needed' if item['needs_reorder'] else 'low_stock',
'severity': 'high' if item['needs_reorder'] else 'medium'
})
return alerts
except Exception as e:
logger.error("Failed to check low stock alerts", error=str(e), tenant_id=tenant_id)
raise
async def check_expiration_alerts(self, tenant_id: UUID, days_ahead: int = 7) -> List[Dict[str, Any]]:
"""Check for stock items expiring soon"""
try:
async with get_db_transaction() as db:
stock_repo = StockRepository(db)
expiring_items = await stock_repo.get_expiring_stock(tenant_id, days_ahead)
alerts = []
for stock, ingredient in expiring_items:
days_to_expiry = (stock.expiration_date - datetime.now()).days if stock.expiration_date else None
alerts.append({
'stock_id': str(stock.id),
'ingredient_id': str(ingredient.id),
'ingredient_name': ingredient.name,
'batch_number': stock.batch_number,
'quantity': stock.available_quantity,
'expiration_date': stock.expiration_date,
'days_to_expiry': days_to_expiry,
'alert_type': 'expiring_soon',
'severity': 'critical' if days_to_expiry and days_to_expiry <= 1 else 'high'
})
return alerts
except Exception as e:
logger.error("Failed to check expiration alerts", error=str(e), tenant_id=tenant_id)
raise
# ===== DASHBOARD AND ANALYTICS =====
async def get_inventory_summary(self, tenant_id: UUID) -> InventorySummary:
"""Get inventory summary for dashboard"""
try:
async with get_db_transaction() as db:
ingredient_repo = IngredientRepository(db)
stock_repo = StockRepository(db)
movement_repo = StockMovementRepository(db)
# Get basic counts
total_ingredients_result = await ingredient_repo.count({'tenant_id': tenant_id, 'is_active': True})
# Get stock summary
stock_summary = await stock_repo.get_stock_summary_by_tenant(tenant_id)
# Get low stock and expiring items
low_stock_items = await ingredient_repo.get_low_stock_ingredients(tenant_id)
expiring_items = await stock_repo.get_expiring_stock(tenant_id, 7)
expired_items = await stock_repo.get_expired_stock(tenant_id)
# Get recent activity
recent_activity = await movement_repo.get_movement_summary_by_period(tenant_id, 7)
# Build category breakdown
stock_by_category = {}
ingredients = await ingredient_repo.get_ingredients_by_tenant(tenant_id, 0, 1000)
for ingredient in ingredients:
category = ingredient.category.value if ingredient.category else 'other'
if category not in stock_by_category:
stock_by_category[category] = {
'count': 0,
'total_value': 0.0,
'low_stock_items': 0
}
stock_by_category[category]['count'] += 1
# Additional calculations would go here
return InventorySummary(
total_ingredients=total_ingredients_result,
total_stock_value=stock_summary['total_stock_value'],
low_stock_alerts=len(low_stock_items),
expiring_soon_items=len(expiring_items),
expired_items=len(expired_items),
out_of_stock_items=0, # TODO: Calculate this
stock_by_category=stock_by_category,
recent_movements=recent_activity.get('total_movements', 0),
recent_purchases=recent_activity.get('purchase', {}).get('count', 0),
recent_waste=recent_activity.get('waste', {}).get('count', 0)
)
except Exception as e:
logger.error("Failed to get inventory summary", error=str(e), tenant_id=tenant_id)
raise
# ===== PRIVATE HELPER METHODS =====
async def _validate_ingredient_data(self, ingredient_data: IngredientCreate, tenant_id: UUID):
"""Validate ingredient data for business rules"""
# Add business validation logic here
if ingredient_data.reorder_point <= ingredient_data.low_stock_threshold:
raise ValueError("Reorder point must be greater than low stock threshold")
if ingredient_data.requires_freezing and ingredient_data.requires_refrigeration:
raise ValueError("Item cannot require both freezing and refrigeration")
# Add more validations as needed
pass

View File

@@ -0,0 +1,244 @@
# services/inventory/app/services/messaging.py
"""
Messaging service for inventory events
"""
from typing import Dict, Any, Optional
from uuid import UUID
import structlog
from shared.messaging.rabbitmq import MessagePublisher
from shared.messaging.events import (
EVENT_TYPES,
InventoryEvent,
StockAlertEvent,
StockMovementEvent
)
logger = structlog.get_logger()
class InventoryMessagingService:
"""Service for publishing inventory-related events"""
def __init__(self):
self.publisher = MessagePublisher()
async def publish_ingredient_created(
self,
tenant_id: UUID,
ingredient_id: UUID,
ingredient_data: Dict[str, Any]
):
"""Publish ingredient creation event"""
try:
event = InventoryEvent(
event_type=EVENT_TYPES.INVENTORY.INGREDIENT_CREATED,
tenant_id=str(tenant_id),
ingredient_id=str(ingredient_id),
data=ingredient_data
)
await self.publisher.publish_event(
routing_key="inventory.ingredient.created",
event=event
)
logger.info(
"Published ingredient created event",
tenant_id=tenant_id,
ingredient_id=ingredient_id
)
except Exception as e:
logger.error(
"Failed to publish ingredient created event",
error=str(e),
tenant_id=tenant_id,
ingredient_id=ingredient_id
)
async def publish_stock_added(
self,
tenant_id: UUID,
ingredient_id: UUID,
stock_id: UUID,
quantity: float,
batch_number: Optional[str] = None
):
"""Publish stock addition event"""
try:
movement_event = StockMovementEvent(
event_type=EVENT_TYPES.INVENTORY.STOCK_ADDED,
tenant_id=str(tenant_id),
ingredient_id=str(ingredient_id),
stock_id=str(stock_id),
quantity=quantity,
movement_type="purchase",
data={
"batch_number": batch_number,
"movement_type": "purchase"
}
)
await self.publisher.publish_event(
routing_key="inventory.stock.added",
event=movement_event
)
logger.info(
"Published stock added event",
tenant_id=tenant_id,
ingredient_id=ingredient_id,
quantity=quantity
)
except Exception as e:
logger.error(
"Failed to publish stock added event",
error=str(e),
tenant_id=tenant_id,
ingredient_id=ingredient_id
)
async def publish_stock_consumed(
self,
tenant_id: UUID,
ingredient_id: UUID,
consumed_items: list,
total_quantity: float,
reference_number: Optional[str] = None
):
"""Publish stock consumption event"""
try:
for item in consumed_items:
movement_event = StockMovementEvent(
event_type=EVENT_TYPES.INVENTORY.STOCK_CONSUMED,
tenant_id=str(tenant_id),
ingredient_id=str(ingredient_id),
stock_id=item['stock_id'],
quantity=item['quantity_consumed'],
movement_type="production_use",
data={
"batch_number": item.get('batch_number'),
"reference_number": reference_number,
"movement_type": "production_use"
}
)
await self.publisher.publish_event(
routing_key="inventory.stock.consumed",
event=movement_event
)
logger.info(
"Published stock consumed events",
tenant_id=tenant_id,
ingredient_id=ingredient_id,
total_quantity=total_quantity,
items_count=len(consumed_items)
)
except Exception as e:
logger.error(
"Failed to publish stock consumed event",
error=str(e),
tenant_id=tenant_id,
ingredient_id=ingredient_id
)
async def publish_low_stock_alert(
self,
tenant_id: UUID,
ingredient_id: UUID,
ingredient_name: str,
current_stock: float,
threshold: float,
needs_reorder: bool = False
):
"""Publish low stock alert event"""
try:
alert_event = StockAlertEvent(
event_type=EVENT_TYPES.INVENTORY.LOW_STOCK_ALERT,
tenant_id=str(tenant_id),
ingredient_id=str(ingredient_id),
alert_type="low_stock" if not needs_reorder else "reorder_needed",
severity="medium" if not needs_reorder else "high",
data={
"ingredient_name": ingredient_name,
"current_stock": current_stock,
"threshold": threshold,
"needs_reorder": needs_reorder
}
)
await self.publisher.publish_event(
routing_key="inventory.alerts.low_stock",
event=alert_event
)
logger.info(
"Published low stock alert",
tenant_id=tenant_id,
ingredient_id=ingredient_id,
current_stock=current_stock
)
except Exception as e:
logger.error(
"Failed to publish low stock alert",
error=str(e),
tenant_id=tenant_id,
ingredient_id=ingredient_id
)
async def publish_expiration_alert(
self,
tenant_id: UUID,
ingredient_id: UUID,
stock_id: UUID,
ingredient_name: str,
batch_number: Optional[str],
expiration_date: str,
days_to_expiry: int,
quantity: float
):
"""Publish expiration alert event"""
try:
severity = "critical" if days_to_expiry <= 1 else "high"
alert_event = StockAlertEvent(
event_type=EVENT_TYPES.INVENTORY.EXPIRATION_ALERT,
tenant_id=str(tenant_id),
ingredient_id=str(ingredient_id),
alert_type="expiring_soon",
severity=severity,
data={
"stock_id": str(stock_id),
"ingredient_name": ingredient_name,
"batch_number": batch_number,
"expiration_date": expiration_date,
"days_to_expiry": days_to_expiry,
"quantity": quantity
}
)
await self.publisher.publish_event(
routing_key="inventory.alerts.expiration",
event=alert_event
)
logger.info(
"Published expiration alert",
tenant_id=tenant_id,
ingredient_id=ingredient_id,
days_to_expiry=days_to_expiry
)
except Exception as e:
logger.error(
"Failed to publish expiration alert",
error=str(e),
tenant_id=tenant_id,
ingredient_id=ingredient_id
)

View File

@@ -0,0 +1,467 @@
# services/inventory/app/services/product_classifier.py
"""
AI Product Classification Service
Automatically classifies products from sales data during onboarding
"""
import re
import structlog
from typing import Dict, Any, List, Optional, Tuple
from enum import Enum
from dataclasses import dataclass
from app.models.inventory import ProductType, IngredientCategory, ProductCategory, UnitOfMeasure
logger = structlog.get_logger()
@dataclass
class ProductSuggestion:
"""Suggested inventory item from sales data analysis"""
original_name: str
suggested_name: str
product_type: ProductType
category: str # ingredient_category or product_category
unit_of_measure: UnitOfMeasure
confidence_score: float # 0.0 to 1.0
estimated_shelf_life_days: Optional[int] = None
requires_refrigeration: bool = False
requires_freezing: bool = False
is_seasonal: bool = False
suggested_supplier: Optional[str] = None
notes: Optional[str] = None
class ProductClassifierService:
"""AI-powered product classification for onboarding automation"""
def __init__(self):
self._load_classification_rules()
def _load_classification_rules(self):
"""Load classification patterns and rules"""
# Ingredient patterns with high confidence
self.ingredient_patterns = {
IngredientCategory.FLOUR: {
'patterns': [
r'harina', r'flour', r'trigo', r'wheat', r'integral', r'whole.*wheat',
r'centeno', r'rye', r'avena', r'oat', r'maiz', r'corn'
],
'unit': UnitOfMeasure.KILOGRAMS,
'shelf_life': 365,
'supplier_hints': ['molinos', 'harinera', 'mill']
},
IngredientCategory.YEAST: {
'patterns': [
r'levadura', r'yeast', r'fermento', r'baker.*yeast', r'instant.*yeast'
],
'unit': UnitOfMeasure.GRAMS,
'shelf_life': 730,
'refrigeration': True
},
IngredientCategory.DAIRY: {
'patterns': [
r'leche', r'milk', r'nata', r'cream', r'mantequilla', r'butter',
r'queso', r'cheese', r'yogur', r'yogurt'
],
'unit': UnitOfMeasure.LITERS,
'shelf_life': 7,
'refrigeration': True
},
IngredientCategory.EGGS: {
'patterns': [
r'huevo', r'egg', r'clara', r'white', r'yema', r'yolk'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 28,
'refrigeration': True
},
IngredientCategory.SUGAR: {
'patterns': [
r'azucar', r'sugar', r'edulcorante', r'sweetener', r'miel', r'honey',
r'jarabe', r'syrup', r'mascabado', r'brown.*sugar'
],
'unit': UnitOfMeasure.KILOGRAMS,
'shelf_life': 730
},
IngredientCategory.FATS: {
'patterns': [
r'aceite', r'oil', r'grasa', r'fat', r'margarina', r'margarine',
r'manteca', r'lard', r'oliva', r'olive'
],
'unit': UnitOfMeasure.LITERS,
'shelf_life': 365
},
IngredientCategory.SALT: {
'patterns': [
r'sal', r'salt', r'sodium', r'sodio'
],
'unit': UnitOfMeasure.KILOGRAMS,
'shelf_life': 1825 # 5 years
},
IngredientCategory.SPICES: {
'patterns': [
r'canela', r'cinnamon', r'vainilla', r'vanilla', r'cacao', r'cocoa',
r'chocolate', r'anis', r'anise', r'cardamomo', r'cardamom',
r'jengibre', r'ginger', r'nuez.*moscada', r'nutmeg'
],
'unit': UnitOfMeasure.GRAMS,
'shelf_life': 730
},
IngredientCategory.ADDITIVES: {
'patterns': [
r'polvo.*hornear', r'baking.*powder', r'bicarbonato', r'soda',
r'cremor.*tartaro', r'cream.*tartar', r'lecitina', r'lecithin',
r'conservante', r'preservative', r'emulsificante', r'emulsifier'
],
'unit': UnitOfMeasure.GRAMS,
'shelf_life': 730
},
IngredientCategory.PACKAGING: {
'patterns': [
r'bolsa', r'bag', r'envase', r'container', r'papel', r'paper',
r'plastico', r'plastic', r'carton', r'cardboard'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 1825
}
}
# Finished product patterns
self.product_patterns = {
ProductCategory.BREAD: {
'patterns': [
r'pan\b', r'bread', r'baguette', r'hogaza', r'loaf', r'molde',
r'integral', r'whole.*grain', r'centeno', r'rye.*bread'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 3,
'display_life': 24 # hours
},
ProductCategory.CROISSANTS: {
'patterns': [
r'croissant', r'cruasan', r'napolitana', r'palmera', r'palmier'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 2,
'display_life': 12
},
ProductCategory.PASTRIES: {
'patterns': [
r'pastel', r'pastry', r'hojaldre', r'puff.*pastry', r'empanada',
r'milhojas', r'napoleon', r'eclair', r'profiterol'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 2,
'display_life': 24,
'refrigeration': True
},
ProductCategory.CAKES: {
'patterns': [
r'tarta', r'cake', r'bizcocho', r'sponge', r'cheesecake',
r'tiramisu', r'mousse', r'torta'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 3,
'refrigeration': True
},
ProductCategory.COOKIES: {
'patterns': [
r'galleta', r'cookie', r'biscuit', r'mantecada', r'madeleine'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 14
},
ProductCategory.MUFFINS: {
'patterns': [
r'muffin', r'magdalena', r'cupcake', r'fairy.*cake'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 3
},
ProductCategory.SANDWICHES: {
'patterns': [
r'sandwich', r'bocadillo', r'tostada', r'toast', r'bagel'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 1,
'display_life': 6,
'refrigeration': True
},
ProductCategory.BEVERAGES: {
'patterns': [
r'cafe', r'coffee', r'te\b', r'tea', r'chocolate.*caliente',
r'hot.*chocolate', r'zumo', r'juice', r'batido', r'smoothie'
],
'unit': UnitOfMeasure.UNITS,
'shelf_life': 1
}
}
# Seasonal indicators
self.seasonal_patterns = {
'christmas': [r'navidad', r'christmas', r'turron', r'polvoron', r'roscon'],
'easter': [r'pascua', r'easter', r'mona', r'torrija'],
'summer': [r'helado', r'ice.*cream', r'granizado', r'sorbete']
}
def classify_product(self, product_name: str, sales_volume: Optional[float] = None) -> ProductSuggestion:
"""Classify a single product name into inventory suggestion"""
# Normalize product name for analysis
normalized_name = self._normalize_name(product_name)
# Try to classify as ingredient first
ingredient_result = self._classify_as_ingredient(normalized_name, product_name)
if ingredient_result and ingredient_result.confidence_score >= 0.7:
return ingredient_result
# Try to classify as finished product
product_result = self._classify_as_finished_product(normalized_name, product_name)
if product_result:
return product_result
# Fallback: create generic finished product with low confidence
return self._create_fallback_suggestion(product_name, normalized_name)
def classify_products_batch(self, product_names: List[str],
sales_volumes: Optional[Dict[str, float]] = None) -> List[ProductSuggestion]:
"""Classify multiple products and detect business model"""
suggestions = []
for name in product_names:
volume = sales_volumes.get(name) if sales_volumes else None
suggestion = self.classify_product(name, volume)
suggestions.append(suggestion)
# Analyze business model based on classification results
self._analyze_business_model(suggestions)
return suggestions
def _normalize_name(self, name: str) -> str:
"""Normalize product name for pattern matching"""
if not name:
return ""
# Convert to lowercase
normalized = name.lower().strip()
# Remove common prefixes/suffixes
prefixes_to_remove = ['el ', 'la ', 'los ', 'las ', 'un ', 'una ']
for prefix in prefixes_to_remove:
if normalized.startswith(prefix):
normalized = normalized[len(prefix):]
# Remove special characters but keep spaces and accents
normalized = re.sub(r'[^\w\sáéíóúñü]', ' ', normalized)
# Normalize multiple spaces
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _classify_as_ingredient(self, normalized_name: str, original_name: str) -> Optional[ProductSuggestion]:
"""Try to classify as ingredient"""
best_match = None
best_score = 0.0
for category, config in self.ingredient_patterns.items():
for pattern in config['patterns']:
if re.search(pattern, normalized_name, re.IGNORECASE):
# Calculate confidence based on pattern specificity
score = self._calculate_confidence_score(pattern, normalized_name)
if score > best_score:
best_score = score
best_match = (category, config)
if best_match and best_score >= 0.6:
category, config = best_match
return ProductSuggestion(
original_name=original_name,
suggested_name=self._suggest_clean_name(original_name, normalized_name),
product_type=ProductType.INGREDIENT,
category=category.value,
unit_of_measure=config['unit'],
confidence_score=best_score,
estimated_shelf_life_days=config.get('shelf_life'),
requires_refrigeration=config.get('refrigeration', False),
requires_freezing=config.get('freezing', False),
suggested_supplier=self._suggest_supplier(normalized_name, config.get('supplier_hints', [])),
notes=f"Auto-classified as {category.value} ingredient"
)
return None
def _classify_as_finished_product(self, normalized_name: str, original_name: str) -> Optional[ProductSuggestion]:
"""Try to classify as finished product"""
best_match = None
best_score = 0.0
for category, config in self.product_patterns.items():
for pattern in config['patterns']:
if re.search(pattern, normalized_name, re.IGNORECASE):
score = self._calculate_confidence_score(pattern, normalized_name)
if score > best_score:
best_score = score
best_match = (category, config)
if best_match:
category, config = best_match
# Check if seasonal
is_seasonal = self._is_seasonal_product(normalized_name)
return ProductSuggestion(
original_name=original_name,
suggested_name=self._suggest_clean_name(original_name, normalized_name),
product_type=ProductType.FINISHED_PRODUCT,
category=category.value,
unit_of_measure=config['unit'],
confidence_score=best_score,
estimated_shelf_life_days=config.get('shelf_life'),
requires_refrigeration=config.get('refrigeration', False),
requires_freezing=config.get('freezing', False),
is_seasonal=is_seasonal,
notes=f"Auto-classified as {category.value}"
)
return None
def _create_fallback_suggestion(self, original_name: str, normalized_name: str) -> ProductSuggestion:
"""Create a fallback suggestion for unclassified products"""
return ProductSuggestion(
original_name=original_name,
suggested_name=self._suggest_clean_name(original_name, normalized_name),
product_type=ProductType.FINISHED_PRODUCT,
category=ProductCategory.OTHER_PRODUCTS.value,
unit_of_measure=UnitOfMeasure.UNITS,
confidence_score=0.3,
estimated_shelf_life_days=3,
notes="Needs manual classification - defaulted to finished product"
)
def _calculate_confidence_score(self, pattern: str, normalized_name: str) -> float:
"""Calculate confidence score for pattern match"""
# Base score for match
base_score = 0.8
# Boost score for exact matches
if pattern.lower() == normalized_name:
return 0.95
# Boost score for word boundary matches
if re.search(r'\b' + pattern + r'\b', normalized_name, re.IGNORECASE):
base_score += 0.1
# Reduce score for partial matches
if len(pattern) < len(normalized_name) / 2:
base_score -= 0.2
return min(0.95, max(0.3, base_score))
def _suggest_clean_name(self, original_name: str, normalized_name: str) -> str:
"""Suggest a cleaned version of the product name"""
# Capitalize properly
words = original_name.split()
cleaned = []
for word in words:
if len(word) > 0:
# Keep original casing for abbreviations
if word.isupper() and len(word) <= 3:
cleaned.append(word)
else:
cleaned.append(word.capitalize())
return ' '.join(cleaned)
def _suggest_supplier(self, normalized_name: str, supplier_hints: List[str]) -> Optional[str]:
"""Suggest potential supplier based on product type"""
for hint in supplier_hints:
if hint in normalized_name:
return f"Suggested: {hint.title()}"
return None
def _is_seasonal_product(self, normalized_name: str) -> bool:
"""Check if product appears to be seasonal"""
for season, patterns in self.seasonal_patterns.items():
for pattern in patterns:
if re.search(pattern, normalized_name, re.IGNORECASE):
return True
return False
def _analyze_business_model(self, suggestions: List[ProductSuggestion]) -> Dict[str, Any]:
"""Analyze business model based on product classifications"""
ingredient_count = sum(1 for s in suggestions if s.product_type == ProductType.INGREDIENT)
finished_count = sum(1 for s in suggestions if s.product_type == ProductType.FINISHED_PRODUCT)
total = len(suggestions)
if total == 0:
return {"model": "unknown", "confidence": 0.0}
ingredient_ratio = ingredient_count / total
if ingredient_ratio >= 0.7:
model = "production" # Production bakery
elif ingredient_ratio <= 0.3:
model = "retail" # Retail/Distribution bakery
else:
model = "hybrid" # Mixed model
confidence = max(abs(ingredient_ratio - 0.5) * 2, 0.1)
logger.info("Business model analysis",
model=model, confidence=confidence,
ingredient_count=ingredient_count,
finished_count=finished_count)
return {
"model": model,
"confidence": confidence,
"ingredient_ratio": ingredient_ratio,
"recommendations": self._get_model_recommendations(model)
}
def _get_model_recommendations(self, model: str) -> List[str]:
"""Get recommendations based on detected business model"""
recommendations = {
"production": [
"Focus on ingredient inventory management",
"Set up recipe cost calculation",
"Configure supplier relationships",
"Enable production planning features"
],
"retail": [
"Configure central baker relationships",
"Set up delivery schedule tracking",
"Enable finished product freshness monitoring",
"Focus on sales forecasting"
],
"hybrid": [
"Configure both ingredient and finished product management",
"Set up flexible inventory categories",
"Enable both production and retail features"
]
}
return recommendations.get(model, [])
# Dependency injection
def get_product_classifier() -> ProductClassifierService:
"""Get product classifier service instance"""
return ProductClassifierService()