215 lines
8.5 KiB
Python
215 lines
8.5 KiB
Python
# services/sales/tests/unit/test_repositories.py
|
|
"""
|
|
Unit tests for Sales Repository
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timezone
|
|
from decimal import Decimal
|
|
from uuid import UUID
|
|
|
|
from app.repositories.sales_repository import SalesRepository
|
|
from app.models.sales import SalesData
|
|
from app.schemas.sales import SalesDataCreate, SalesDataUpdate, SalesDataQuery
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestSalesRepository:
|
|
"""Test Sales Repository operations"""
|
|
|
|
async def test_create_sales_record(self, test_db_session, sample_tenant_id, sample_sales_data):
|
|
"""Test creating a sales record"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
|
|
|
|
assert record is not None
|
|
assert record.id is not None
|
|
assert record.tenant_id == sample_tenant_id
|
|
assert record.inventory_product_id == sample_sales_data.inventory_product_id
|
|
assert record.quantity_sold == sample_sales_data.quantity_sold
|
|
assert record.revenue == sample_sales_data.revenue
|
|
|
|
async def test_get_by_id(self, test_db_session, sample_tenant_id, sample_sales_data):
|
|
"""Test getting a sales record by ID"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Create record first
|
|
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
|
|
|
|
# Get by ID
|
|
retrieved_record = await repository.get_by_id(created_record.id)
|
|
|
|
assert retrieved_record is not None
|
|
assert retrieved_record.id == created_record.id
|
|
assert retrieved_record.inventory_product_id == created_record.inventory_product_id
|
|
|
|
async def test_get_by_tenant(self, populated_db, sample_tenant_id):
|
|
"""Test getting records by tenant"""
|
|
repository = SalesRepository(populated_db)
|
|
|
|
records = await repository.get_by_tenant(sample_tenant_id)
|
|
|
|
assert len(records) == 3 # From populated_db fixture
|
|
assert all(record.tenant_id == sample_tenant_id for record in records)
|
|
|
|
async def test_get_by_product(self, populated_db, sample_tenant_id):
|
|
"""Test getting records by product"""
|
|
repository = SalesRepository(populated_db)
|
|
|
|
# Get by inventory_product_id instead of product name
|
|
test_product_id = "550e8400-e29b-41d4-a716-446655440001"
|
|
records = await repository.get_by_inventory_product_id(sample_tenant_id, test_product_id)
|
|
|
|
assert len(records) == 1
|
|
assert records[0].inventory_product_id == test_product_id
|
|
|
|
async def test_update_record(self, test_db_session, sample_tenant_id, sample_sales_data):
|
|
"""Test updating a sales record"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Create record first
|
|
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
|
|
|
|
# Update record
|
|
update_data = SalesDataUpdate(
|
|
inventory_product_id="550e8400-e29b-41d4-a716-446655440999",
|
|
product_name="Updated Product",
|
|
quantity_sold=10,
|
|
revenue=Decimal("25.00")
|
|
)
|
|
|
|
updated_record = await repository.update(created_record.id, update_data.model_dump(exclude_unset=True))
|
|
|
|
assert updated_record.inventory_product_id == "550e8400-e29b-41d4-a716-446655440999"
|
|
assert updated_record.quantity_sold == 10
|
|
assert updated_record.revenue == Decimal("25.00")
|
|
|
|
async def test_delete_record(self, test_db_session, sample_tenant_id, sample_sales_data):
|
|
"""Test deleting a sales record"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Create record first
|
|
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
|
|
|
|
# Delete record
|
|
success = await repository.delete(created_record.id)
|
|
|
|
assert success is True
|
|
|
|
# Verify record is deleted
|
|
deleted_record = await repository.get_by_id(created_record.id)
|
|
assert deleted_record is None
|
|
|
|
async def test_get_analytics(self, populated_db, sample_tenant_id):
|
|
"""Test getting analytics for tenant"""
|
|
repository = SalesRepository(populated_db)
|
|
|
|
analytics = await repository.get_analytics(sample_tenant_id)
|
|
|
|
assert "total_revenue" in analytics
|
|
assert "total_quantity" in analytics
|
|
assert "total_transactions" in analytics
|
|
assert "average_transaction_value" in analytics
|
|
assert analytics["total_transactions"] == 3
|
|
|
|
async def test_get_product_categories(self, populated_db, sample_tenant_id):
|
|
"""Test getting distinct product categories"""
|
|
repository = SalesRepository(populated_db)
|
|
|
|
categories = await repository.get_product_categories(sample_tenant_id)
|
|
|
|
assert isinstance(categories, list)
|
|
# Should be empty since populated_db doesn't set categories
|
|
|
|
async def test_validate_record(self, test_db_session, sample_tenant_id, sample_sales_data):
|
|
"""Test validating a sales record"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Create record first
|
|
created_record = await repository.create_sales_record(sample_sales_data, sample_tenant_id)
|
|
|
|
# Validate record
|
|
validated_record = await repository.validate_record(created_record.id, "Test validation")
|
|
|
|
assert validated_record.is_validated is True
|
|
assert validated_record.validation_notes == "Test validation"
|
|
|
|
async def test_query_with_filters(self, populated_db, sample_tenant_id):
|
|
"""Test querying with filters"""
|
|
repository = SalesRepository(populated_db)
|
|
|
|
query = SalesDataQuery(
|
|
inventory_product_id="550e8400-e29b-41d4-a716-446655440001",
|
|
limit=10,
|
|
offset=0
|
|
)
|
|
|
|
records = await repository.get_by_tenant(sample_tenant_id, query)
|
|
|
|
assert len(records) == 1
|
|
assert records[0].inventory_product_id == "550e8400-e29b-41d4-a716-446655440001"
|
|
|
|
async def test_bulk_create(self, test_db_session, sample_tenant_id):
|
|
"""Test bulk creating records"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Create multiple records data
|
|
bulk_data = [
|
|
{
|
|
"date": datetime.now(timezone.utc),
|
|
"inventory_product_id": f"550e8400-e29b-41d4-a716-{i+100:012x}",
|
|
"product_name": f"Product {i}",
|
|
"quantity_sold": i + 1,
|
|
"revenue": Decimal(str((i + 1) * 2.5)),
|
|
"source": "bulk_test"
|
|
}
|
|
for i in range(5)
|
|
]
|
|
|
|
created_count = await repository.bulk_create_sales_data(bulk_data, sample_tenant_id)
|
|
|
|
assert created_count == 5
|
|
|
|
# Verify records were created
|
|
all_records = await repository.get_by_tenant(sample_tenant_id)
|
|
assert len(all_records) == 5
|
|
|
|
async def test_repository_error_handling(self, test_db_session, sample_tenant_id):
|
|
"""Test repository error handling"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Test getting non-existent record
|
|
non_existent = await repository.get_by_id("non-existent-id")
|
|
assert non_existent is None
|
|
|
|
# Test deleting non-existent record
|
|
delete_success = await repository.delete("non-existent-id")
|
|
assert delete_success is False
|
|
|
|
async def test_performance_bulk_operations(self, test_db_session, sample_tenant_id, performance_test_data):
|
|
"""Test performance of bulk operations"""
|
|
repository = SalesRepository(test_db_session)
|
|
|
|
# Test bulk create performance
|
|
import time
|
|
start_time = time.time()
|
|
|
|
created_count = await repository.bulk_create_sales_data(performance_test_data, sample_tenant_id)
|
|
|
|
end_time = time.time()
|
|
execution_time = end_time - start_time
|
|
|
|
assert created_count == len(performance_test_data)
|
|
assert execution_time < 5.0 # Should complete in under 5 seconds
|
|
|
|
# Test bulk retrieval performance
|
|
start_time = time.time()
|
|
|
|
all_records = await repository.get_by_tenant(sample_tenant_id)
|
|
|
|
end_time = time.time()
|
|
execution_time = end_time - start_time
|
|
|
|
assert len(all_records) == len(performance_test_data)
|
|
assert execution_time < 2.0 # Should complete in under 2 seconds |