326 lines
14 KiB
Python
326 lines
14 KiB
Python
# services/sales/app/services/sales_service.py
|
|
"""
|
|
Sales Service - Business Logic Layer
|
|
"""
|
|
|
|
from typing import List, Optional, Dict, Any
|
|
from uuid import UUID
|
|
from datetime import datetime
|
|
import structlog
|
|
|
|
from app.models.sales import SalesData
|
|
from app.repositories.sales_repository import SalesRepository
|
|
from app.schemas.sales import SalesDataCreate, SalesDataUpdate, SalesDataQuery, SalesAnalytics
|
|
from app.core.database import get_db_transaction
|
|
from app.services.inventory_client import InventoryServiceClient
|
|
from shared.database.exceptions import DatabaseError
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class SalesService:
|
|
"""Service layer for sales operations"""
|
|
|
|
def __init__(self):
|
|
self.inventory_client = InventoryServiceClient()
|
|
|
|
async def create_sales_record(
|
|
self,
|
|
sales_data: SalesDataCreate,
|
|
tenant_id: UUID,
|
|
user_id: Optional[UUID] = None
|
|
) -> SalesData:
|
|
"""Create a new sales record with business validation"""
|
|
try:
|
|
# Sync product data with inventory service if inventory_product_id is provided
|
|
if sales_data.inventory_product_id:
|
|
product_cache = await self.inventory_client.sync_product_cache(
|
|
sales_data.inventory_product_id, tenant_id
|
|
)
|
|
if product_cache:
|
|
# Update cached product fields from inventory
|
|
sales_data_dict = sales_data.model_dump()
|
|
sales_data_dict.update(product_cache)
|
|
sales_data = SalesDataCreate(**sales_data_dict)
|
|
else:
|
|
logger.warning("Could not sync product from inventory",
|
|
product_id=sales_data.inventory_product_id, tenant_id=tenant_id)
|
|
|
|
# Business validation
|
|
await self._validate_sales_data(sales_data, tenant_id)
|
|
|
|
# Set user who created the record
|
|
if user_id:
|
|
sales_data_dict = sales_data.model_dump()
|
|
sales_data_dict['created_by'] = user_id
|
|
sales_data = SalesDataCreate(**sales_data_dict)
|
|
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
record = await repository.create_sales_record(sales_data, tenant_id)
|
|
|
|
# Additional business logic (e.g., notifications, analytics updates)
|
|
await self._post_create_actions(record)
|
|
|
|
return record
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to create sales record in service", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def update_sales_record(
|
|
self,
|
|
record_id: UUID,
|
|
update_data: SalesDataUpdate,
|
|
tenant_id: UUID
|
|
) -> SalesData:
|
|
"""Update a sales record"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
|
|
# Verify record belongs to tenant
|
|
existing_record = await repository.get_by_id(record_id)
|
|
if not existing_record or existing_record.tenant_id != tenant_id:
|
|
raise ValueError(f"Sales record {record_id} not found for tenant {tenant_id}")
|
|
|
|
# Update the record
|
|
updated_record = await repository.update(record_id, update_data.model_dump(exclude_unset=True))
|
|
|
|
logger.info("Updated sales record", record_id=record_id, tenant_id=tenant_id)
|
|
return updated_record
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to update sales record", error=str(e), record_id=record_id, tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def get_sales_records(
|
|
self,
|
|
tenant_id: UUID,
|
|
query_params: Optional[SalesDataQuery] = None
|
|
) -> List[SalesData]:
|
|
"""Get sales records for a tenant"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
records = await repository.get_by_tenant(tenant_id, query_params)
|
|
|
|
logger.info("Retrieved sales records", count=len(records), tenant_id=tenant_id)
|
|
return records
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get sales records", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def get_sales_record(self, record_id: UUID, tenant_id: UUID) -> Optional[SalesData]:
|
|
"""Get a specific sales record"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
record = await repository.get_by_id(record_id)
|
|
|
|
# Verify record belongs to tenant
|
|
if record and record.tenant_id != tenant_id:
|
|
return None
|
|
|
|
return record
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get sales record", error=str(e), record_id=record_id, tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def delete_sales_record(self, record_id: UUID, tenant_id: UUID) -> bool:
|
|
"""Delete a sales record"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
|
|
# Verify record belongs to tenant
|
|
existing_record = await repository.get_by_id(record_id)
|
|
if not existing_record or existing_record.tenant_id != tenant_id:
|
|
raise ValueError(f"Sales record {record_id} not found for tenant {tenant_id}")
|
|
|
|
success = await repository.delete(record_id)
|
|
|
|
if success:
|
|
logger.info("Deleted sales record", record_id=record_id, tenant_id=tenant_id)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to delete sales record", error=str(e), record_id=record_id, tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def get_product_sales(
|
|
self,
|
|
tenant_id: UUID,
|
|
inventory_product_id: UUID,
|
|
start_date: Optional[datetime] = None,
|
|
end_date: Optional[datetime] = None
|
|
) -> List[SalesData]:
|
|
"""Get sales records for a specific product by inventory ID"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
records = await repository.get_by_inventory_product(tenant_id, inventory_product_id, start_date, end_date)
|
|
|
|
logger.info(
|
|
"Retrieved product sales",
|
|
count=len(records),
|
|
inventory_product_id=inventory_product_id,
|
|
tenant_id=tenant_id
|
|
)
|
|
return records
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get product sales", error=str(e), tenant_id=tenant_id, inventory_product_id=inventory_product_id)
|
|
raise
|
|
|
|
async def get_sales_analytics(
|
|
self,
|
|
tenant_id: UUID,
|
|
start_date: Optional[datetime] = None,
|
|
end_date: Optional[datetime] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get sales analytics for a tenant"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
analytics = await repository.get_analytics(tenant_id, start_date, end_date)
|
|
|
|
logger.info("Retrieved sales analytics", tenant_id=tenant_id)
|
|
return analytics
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get sales analytics", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def get_product_categories(self, tenant_id: UUID) -> List[str]:
|
|
"""Get distinct product categories from inventory service"""
|
|
try:
|
|
# Get all unique categories from inventory service products
|
|
# This is more accurate than cached categories in sales data
|
|
ingredient_products = await self.inventory_client.search_products("", tenant_id, "ingredient")
|
|
finished_products = await self.inventory_client.search_products("", tenant_id, "finished_product")
|
|
|
|
categories = set()
|
|
for product in ingredient_products:
|
|
if product.get("ingredient_category"):
|
|
categories.add(product["ingredient_category"])
|
|
|
|
for product in finished_products:
|
|
if product.get("product_category"):
|
|
categories.add(product["product_category"])
|
|
|
|
return sorted(list(categories))
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get product categories", error=str(e), tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def validate_sales_record(
|
|
self,
|
|
record_id: UUID,
|
|
tenant_id: UUID,
|
|
validation_notes: Optional[str] = None
|
|
) -> SalesData:
|
|
"""Validate a sales record"""
|
|
try:
|
|
async with get_db_transaction() as db:
|
|
repository = SalesRepository(db)
|
|
|
|
# Verify record belongs to tenant
|
|
existing_record = await repository.get_by_id(record_id)
|
|
if not existing_record or existing_record.tenant_id != tenant_id:
|
|
raise ValueError(f"Sales record {record_id} not found for tenant {tenant_id}")
|
|
|
|
validated_record = await repository.validate_record(record_id, validation_notes)
|
|
|
|
logger.info("Validated sales record", record_id=record_id, tenant_id=tenant_id)
|
|
return validated_record
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to validate sales record", error=str(e), record_id=record_id, tenant_id=tenant_id)
|
|
raise
|
|
|
|
async def _validate_sales_data(self, sales_data: SalesDataCreate, tenant_id: UUID):
|
|
"""Validate sales data according to business rules"""
|
|
# Example business validations
|
|
|
|
# Check if revenue matches quantity * unit_price (if unit_price provided)
|
|
if sales_data.unit_price and sales_data.quantity_sold:
|
|
expected_revenue = sales_data.unit_price * sales_data.quantity_sold
|
|
# Apply discount if any
|
|
if sales_data.discount_applied:
|
|
expected_revenue *= (1 - sales_data.discount_applied / 100)
|
|
|
|
# Allow for small rounding differences
|
|
if abs(float(sales_data.revenue) - float(expected_revenue)) > 0.01:
|
|
logger.warning(
|
|
"Revenue mismatch detected",
|
|
expected=float(expected_revenue),
|
|
actual=float(sales_data.revenue),
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
# Check date validity (not in future)
|
|
if sales_data.date > datetime.utcnow():
|
|
raise ValueError("Sales date cannot be in the future")
|
|
|
|
# Additional business rules can be added here
|
|
logger.info("Sales data validation passed", tenant_id=tenant_id)
|
|
|
|
async def _post_create_actions(self, record: SalesData):
|
|
"""Actions to perform after creating a sales record"""
|
|
try:
|
|
# Here you could:
|
|
# - Send notifications
|
|
# - Update analytics caches
|
|
# - Trigger ML model updates
|
|
# - Update inventory levels (future integration)
|
|
|
|
logger.info("Post-create actions completed", record_id=record.id)
|
|
|
|
except Exception as e:
|
|
# Don't fail the main operation for auxiliary actions
|
|
logger.warning("Failed to execute post-create actions", error=str(e), record_id=record.id)
|
|
|
|
|
|
# New inventory integration methods
|
|
async def search_inventory_products(self, search_term: str, tenant_id: UUID,
|
|
product_type: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Search products in inventory service"""
|
|
try:
|
|
products = await self.inventory_client.search_products(search_term, tenant_id, product_type)
|
|
logger.info("Searched inventory products", search_term=search_term,
|
|
count=len(products), tenant_id=tenant_id)
|
|
return products
|
|
except Exception as e:
|
|
logger.error("Failed to search inventory products",
|
|
error=str(e), search_term=search_term, tenant_id=tenant_id)
|
|
return []
|
|
|
|
async def get_inventory_product(self, product_id: UUID, tenant_id: UUID) -> Optional[Dict[str, Any]]:
|
|
"""Get product details from inventory service"""
|
|
try:
|
|
product = await self.inventory_client.get_product_by_id(product_id, tenant_id)
|
|
if product:
|
|
logger.info("Retrieved inventory product", product_id=product_id, tenant_id=tenant_id)
|
|
return product
|
|
except Exception as e:
|
|
logger.error("Failed to get inventory product",
|
|
error=str(e), product_id=product_id, tenant_id=tenant_id)
|
|
return None
|
|
|
|
async def get_inventory_products_by_category(self, category: str, tenant_id: UUID,
|
|
product_type: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Get products by category from inventory service"""
|
|
try:
|
|
products = await self.inventory_client.get_products_by_category(category, tenant_id, product_type)
|
|
logger.info("Retrieved inventory products by category", category=category,
|
|
count=len(products), tenant_id=tenant_id)
|
|
return products
|
|
except Exception as e:
|
|
logger.error("Failed to get inventory products by category",
|
|
error=str(e), category=category, tenant_id=tenant_id)
|
|
return [] |