Files
bakery-ia/services/sales/tests/unit/test_repositories.py
2025-08-14 16:47:34 +02:00

215 lines
8.5 KiB
Python

# services/sales/tests/unit/test_repositories.py
"""
Unit tests for Sales Repository
"""
import pytest
from datetime import datetime, timezone
from decimal import Decimal
from uuid import UUID
from app.repositories.sales_repository import SalesRepository
from app.models.sales import SalesData
from app.schemas.sales import SalesDataCreate, SalesDataUpdate, SalesDataQuery
@pytest.mark.asyncio
class TestSalesRepository:
"""Test Sales Repository operations"""
async def test_create_sales_record(self, test_db_session, sample_tenant_id, sample_sales_data):
"""Test creating a sales record"""
repository = SalesRepository(test_db_session)
record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
assert record is not None
assert record.id is not None
assert record.tenant_id == sample_tenant_id
assert record.inventory_product_id == sample_sales_data.inventory_product_id
assert record.quantity_sold == sample_sales_data.quantity_sold
assert record.revenue == sample_sales_data.revenue
async def test_get_by_id(self, test_db_session, sample_tenant_id, sample_sales_data):
"""Test getting a sales record by ID"""
repository = SalesRepository(test_db_session)
# Create record first
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
# Get by ID
retrieved_record = await repository.get_by_id(created_record.id)
assert retrieved_record is not None
assert retrieved_record.id == created_record.id
assert retrieved_record.inventory_product_id == created_record.inventory_product_id
async def test_get_by_tenant(self, populated_db, sample_tenant_id):
"""Test getting records by tenant"""
repository = SalesRepository(populated_db)
records = await repository.get_by_tenant(sample_tenant_id)
assert len(records) == 3 # From populated_db fixture
assert all(record.tenant_id == sample_tenant_id for record in records)
async def test_get_by_product(self, populated_db, sample_tenant_id):
"""Test getting records by product"""
repository = SalesRepository(populated_db)
# Get by inventory_product_id instead of product name
test_product_id = "550e8400-e29b-41d4-a716-446655440001"
records = await repository.get_by_inventory_product_id(sample_tenant_id, test_product_id)
assert len(records) == 1
assert records[0].inventory_product_id == test_product_id
async def test_update_record(self, test_db_session, sample_tenant_id, sample_sales_data):
"""Test updating a sales record"""
repository = SalesRepository(test_db_session)
# Create record first
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
# Update record
update_data = SalesDataUpdate(
inventory_product_id="550e8400-e29b-41d4-a716-446655440999",
product_name="Updated Product",
quantity_sold=10,
revenue=Decimal("25.00")
)
updated_record = await repository.update(created_record.id, update_data.model_dump(exclude_unset=True))
assert updated_record.inventory_product_id == "550e8400-e29b-41d4-a716-446655440999"
assert updated_record.quantity_sold == 10
assert updated_record.revenue == Decimal("25.00")
async def test_delete_record(self, test_db_session, sample_tenant_id, sample_sales_data):
"""Test deleting a sales record"""
repository = SalesRepository(test_db_session)
# Create record first
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
# Delete record
success = await repository.delete(created_record.id)
assert success is True
# Verify record is deleted
deleted_record = await repository.get_by_id(created_record.id)
assert deleted_record is None
async def test_get_analytics(self, populated_db, sample_tenant_id):
"""Test getting analytics for tenant"""
repository = SalesRepository(populated_db)
analytics = await repository.get_analytics(sample_tenant_id)
assert "total_revenue" in analytics
assert "total_quantity" in analytics
assert "total_transactions" in analytics
assert "average_transaction_value" in analytics
assert analytics["total_transactions"] == 3
async def test_get_product_categories(self, populated_db, sample_tenant_id):
"""Test getting distinct product categories"""
repository = SalesRepository(populated_db)
categories = await repository.get_product_categories(sample_tenant_id)
assert isinstance(categories, list)
# Should be empty since populated_db doesn't set categories
async def test_validate_record(self, test_db_session, sample_tenant_id, sample_sales_data):
"""Test validating a sales record"""
repository = SalesRepository(test_db_session)
# Create record first
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
# Validate record
validated_record = await repository.validate_record(created_record.id, "Test validation")
assert validated_record.is_validated is True
assert validated_record.validation_notes == "Test validation"
async def test_query_with_filters(self, populated_db, sample_tenant_id):
"""Test querying with filters"""
repository = SalesRepository(populated_db)
query = SalesDataQuery(
inventory_product_id="550e8400-e29b-41d4-a716-446655440001",
limit=10,
offset=0
)
records = await repository.get_by_tenant(sample_tenant_id, query)
assert len(records) == 1
assert records[0].inventory_product_id == "550e8400-e29b-41d4-a716-446655440001"
async def test_bulk_create(self, test_db_session, sample_tenant_id):
"""Test bulk creating records"""
repository = SalesRepository(test_db_session)
# Create multiple records data
bulk_data = [
{
"date": datetime.now(timezone.utc),
"inventory_product_id": f"550e8400-e29b-41d4-a716-{i+100:012x}",
"product_name": f"Product {i}",
"quantity_sold": i + 1,
"revenue": Decimal(str((i + 1) * 2.5)),
"source": "bulk_test"
}
for i in range(5)
]
created_count = await repository.bulk_create_sales_data(bulk_data, sample_tenant_id)
assert created_count == 5
# Verify records were created
all_records = await repository.get_by_tenant(sample_tenant_id)
assert len(all_records) == 5
async def test_repository_error_handling(self, test_db_session, sample_tenant_id):
"""Test repository error handling"""
repository = SalesRepository(test_db_session)
# Test getting non-existent record
non_existent = await repository.get_by_id("non-existent-id")
assert non_existent is None
# Test deleting non-existent record
delete_success = await repository.delete("non-existent-id")
assert delete_success is False
async def test_performance_bulk_operations(self, test_db_session, sample_tenant_id, performance_test_data):
"""Test performance of bulk operations"""
repository = SalesRepository(test_db_session)
# Test bulk create performance
import time
start_time = time.time()
created_count = await repository.bulk_create_sales_data(performance_test_data, sample_tenant_id)
end_time = time.time()
execution_time = end_time - start_time
assert created_count == len(performance_test_data)
assert execution_time < 5.0 # Should complete in under 5 seconds
# Test bulk retrieval performance
start_time = time.time()
all_records = await repository.get_by_tenant(sample_tenant_id)
end_time = time.time()
execution_time = end_time - start_time
assert len(all_records) == len(performance_test_data)
assert execution_time < 2.0 # Should complete in under 2 seconds