244 lines
6.8 KiB
Python
244 lines
6.8 KiB
Python
# services/sales/tests/conftest.py
|
|
"""
|
|
Pytest configuration and fixtures for Sales Service tests
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from decimal import Decimal
|
|
from typing import AsyncGenerator
|
|
from uuid import uuid4, UUID
|
|
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app.main import app
|
|
from app.core.config import settings
|
|
from app.core.database import Base, get_db
|
|
from app.models.sales import SalesData
|
|
from app.schemas.sales import SalesDataCreate
|
|
|
|
|
|
# Test database configuration
|
|
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def event_loop():
|
|
"""Create event loop for the test session"""
|
|
loop = asyncio.new_event_loop()
|
|
yield loop
|
|
loop.close()
|
|
|
|
|
|
@pytest.fixture
|
|
async def test_engine():
|
|
"""Create test database engine"""
|
|
engine = create_async_engine(
|
|
TEST_DATABASE_URL,
|
|
poolclass=StaticPool,
|
|
connect_args={"check_same_thread": False}
|
|
)
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
async def test_db_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
|
|
"""Create test database session"""
|
|
async_session = async_sessionmaker(
|
|
test_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
|
|
async with async_session() as session:
|
|
yield session
|
|
|
|
|
|
@pytest.fixture
|
|
def test_client():
|
|
"""Create test client"""
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture
|
|
async def override_get_db(test_db_session):
|
|
"""Override get_db dependency for testing"""
|
|
async def _override_get_db():
|
|
yield test_db_session
|
|
|
|
app.dependency_overrides[get_db] = _override_get_db
|
|
yield
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
# Test data fixtures
|
|
@pytest.fixture
|
|
def sample_tenant_id() -> UUID:
|
|
"""Sample tenant ID for testing"""
|
|
return uuid4()
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_sales_data(sample_tenant_id: UUID) -> SalesDataCreate:
|
|
"""Sample sales data for testing"""
|
|
return SalesDataCreate(
|
|
date=datetime.now(timezone.utc),
|
|
inventory_product_id="550e8400-e29b-41d4-a716-446655440000",
|
|
product_name="Pan Integral",
|
|
product_category="Panadería",
|
|
product_sku="PAN001",
|
|
quantity_sold=5,
|
|
unit_price=Decimal("2.50"),
|
|
revenue=Decimal("12.50"),
|
|
cost_of_goods=Decimal("6.25"),
|
|
discount_applied=Decimal("0"),
|
|
location_id="STORE_001",
|
|
sales_channel="in_store",
|
|
source="manual",
|
|
notes="Test sale",
|
|
weather_condition="sunny",
|
|
is_holiday=False,
|
|
is_weekend=False
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_sales_records(sample_tenant_id: UUID) -> list[dict]:
|
|
"""Multiple sample sales records"""
|
|
base_date = datetime.now(timezone.utc)
|
|
return [
|
|
{
|
|
"tenant_id": sample_tenant_id,
|
|
"date": base_date,
|
|
"inventory_product_id": "550e8400-e29b-41d4-a716-446655440001",
|
|
"product_name": "Croissant",
|
|
"quantity_sold": 3,
|
|
"revenue": Decimal("7.50"),
|
|
"location_id": "STORE_001",
|
|
"source": "manual"
|
|
},
|
|
{
|
|
"tenant_id": sample_tenant_id,
|
|
"date": base_date,
|
|
"inventory_product_id": "550e8400-e29b-41d4-a716-446655440002",
|
|
"product_name": "Café Americano",
|
|
"quantity_sold": 2,
|
|
"revenue": Decimal("5.00"),
|
|
"location_id": "STORE_001",
|
|
"source": "pos"
|
|
},
|
|
{
|
|
"tenant_id": sample_tenant_id,
|
|
"date": base_date,
|
|
"inventory_product_id": "550e8400-e29b-41d4-a716-446655440003",
|
|
"product_name": "Bocadillo Jamón",
|
|
"quantity_sold": 1,
|
|
"revenue": Decimal("4.50"),
|
|
"location_id": "STORE_002",
|
|
"source": "manual"
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_csv_data() -> str:
|
|
"""Sample CSV data for import testing"""
|
|
return """date,product,quantity,revenue,location
|
|
2024-01-15,Pan Integral,5,12.50,STORE_001
|
|
2024-01-15,Croissant,3,7.50,STORE_001
|
|
2024-01-15,Café Americano,2,5.00,STORE_002
|
|
2024-01-16,Pan de Molde,8,16.00,STORE_001
|
|
2024-01-16,Magdalenas,6,9.00,STORE_002"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_json_data() -> str:
|
|
"""Sample JSON data for import testing"""
|
|
return """[
|
|
{
|
|
"date": "2024-01-15",
|
|
"product": "Pan Integral",
|
|
"quantity": 5,
|
|
"revenue": 12.50,
|
|
"location": "STORE_001"
|
|
},
|
|
{
|
|
"date": "2024-01-15",
|
|
"product": "Croissant",
|
|
"quantity": 3,
|
|
"revenue": 7.50,
|
|
"location": "STORE_001"
|
|
}
|
|
]"""
|
|
|
|
|
|
@pytest.fixture
|
|
async def populated_db(test_db_session: AsyncSession, sample_sales_records: list[dict]):
|
|
"""Database populated with test data"""
|
|
for record_data in sample_sales_records:
|
|
sales_record = SalesData(**record_data)
|
|
test_db_session.add(sales_record)
|
|
|
|
await test_db_session.commit()
|
|
yield test_db_session
|
|
|
|
|
|
# Mock fixtures for external dependencies
|
|
@pytest.fixture
|
|
def mock_messaging():
|
|
"""Mock messaging service"""
|
|
class MockMessaging:
|
|
def __init__(self):
|
|
self.published_events = []
|
|
|
|
async def publish_sales_created(self, data):
|
|
self.published_events.append(("sales_created", data))
|
|
return True
|
|
|
|
async def publish_data_imported(self, data):
|
|
self.published_events.append(("data_imported", data))
|
|
return True
|
|
|
|
return MockMessaging()
|
|
|
|
|
|
# Performance testing fixtures
|
|
@pytest.fixture
|
|
def large_csv_data() -> str:
|
|
"""Large CSV data for performance testing"""
|
|
headers = "date,product,quantity,revenue,location\n"
|
|
rows = []
|
|
|
|
for i in range(1000): # 1000 records
|
|
rows.append(f"2024-01-{(i % 30) + 1:02d},Producto_{i % 10},1,2.50,STORE_{i % 3 + 1:03d}")
|
|
|
|
return headers + "\n".join(rows)
|
|
|
|
|
|
@pytest.fixture
|
|
def performance_test_data(sample_tenant_id: UUID) -> list[dict]:
|
|
"""Large dataset for performance testing"""
|
|
records = []
|
|
base_date = datetime.now(timezone.utc)
|
|
|
|
for i in range(500): # 500 records
|
|
records.append({
|
|
"tenant_id": sample_tenant_id,
|
|
"date": base_date,
|
|
"inventory_product_id": f"550e8400-e29b-41d4-a716-{i:012x}",
|
|
"product_name": f"Test Product {i % 20}",
|
|
"quantity_sold": (i % 10) + 1,
|
|
"revenue": Decimal(str(((i % 10) + 1) * 2.5)),
|
|
"location_id": f"STORE_{(i % 5) + 1:03d}",
|
|
"source": "test"
|
|
})
|
|
|
|
return records |