384 lines
14 KiB
Python
384 lines
14 KiB
Python
# services/sales/tests/unit/test_data_import.py
|
|
"""
|
|
Unit tests for Data Import Service
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
import base64
|
|
from decimal import Decimal
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
from app.services.data_import_service import DataImportService, SalesValidationResult, SalesImportResult
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestDataImportService:
|
|
"""Test Data Import Service functionality"""
|
|
|
|
@pytest.fixture
|
|
def import_service(self):
|
|
"""Create data import service instance"""
|
|
return DataImportService()
|
|
|
|
async def test_validate_csv_import_data_valid(self, import_service, sample_tenant_id, sample_csv_data):
|
|
"""Test validation of valid CSV import data"""
|
|
data = {
|
|
"tenant_id": str(sample_tenant_id),
|
|
"data": sample_csv_data,
|
|
"data_format": "csv"
|
|
}
|
|
|
|
result = await import_service.validate_import_data(data)
|
|
|
|
assert result.is_valid is True
|
|
assert result.total_records == 5
|
|
assert len(result.errors) == 0
|
|
assert result.summary["status"] == "valid"
|
|
|
|
async def test_validate_csv_import_data_missing_tenant(self, import_service, sample_csv_data):
|
|
"""Test validation with missing tenant_id"""
|
|
data = {
|
|
"data": sample_csv_data,
|
|
"data_format": "csv"
|
|
}
|
|
|
|
result = await import_service.validate_import_data(data)
|
|
|
|
assert result.is_valid is False
|
|
assert any(error["code"] == "MISSING_TENANT_ID" for error in result.errors)
|
|
|
|
async def test_validate_csv_import_data_empty_file(self, import_service, sample_tenant_id):
|
|
"""Test validation with empty file"""
|
|
data = {
|
|
"tenant_id": str(sample_tenant_id),
|
|
"data": "",
|
|
"data_format": "csv"
|
|
}
|
|
|
|
result = await import_service.validate_import_data(data)
|
|
|
|
assert result.is_valid is False
|
|
assert any(error["code"] == "EMPTY_FILE" for error in result.errors)
|
|
|
|
async def test_validate_csv_import_data_unsupported_format(self, import_service, sample_tenant_id):
|
|
"""Test validation with unsupported format"""
|
|
data = {
|
|
"tenant_id": str(sample_tenant_id),
|
|
"data": "some data",
|
|
"data_format": "unsupported"
|
|
}
|
|
|
|
result = await import_service.validate_import_data(data)
|
|
|
|
assert result.is_valid is False
|
|
assert any(error["code"] == "UNSUPPORTED_FORMAT" for error in result.errors)
|
|
|
|
async def test_validate_csv_missing_required_columns(self, import_service, sample_tenant_id):
|
|
"""Test validation with missing required columns"""
|
|
invalid_csv = "invalid_column,another_invalid\nvalue1,value2"
|
|
data = {
|
|
"tenant_id": str(sample_tenant_id),
|
|
"data": invalid_csv,
|
|
"data_format": "csv"
|
|
}
|
|
|
|
result = await import_service.validate_import_data(data)
|
|
|
|
assert result.is_valid is False
|
|
assert any(error["code"] == "MISSING_DATE_COLUMN" for error in result.errors)
|
|
assert any(error["code"] == "MISSING_PRODUCT_COLUMN" for error in result.errors)
|
|
|
|
async def test_process_csv_import_success(self, import_service, sample_tenant_id, sample_csv_data):
|
|
"""Test successful CSV import processing"""
|
|
with patch('app.services.data_import_service.get_db_transaction') as mock_get_db:
|
|
mock_db = AsyncMock()
|
|
mock_get_db.return_value.__aenter__.return_value = mock_db
|
|
|
|
mock_repository = AsyncMock()
|
|
mock_repository.create_sales_record.return_value = AsyncMock()
|
|
|
|
with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository):
|
|
result = await import_service.process_import(
|
|
sample_tenant_id,
|
|
sample_csv_data,
|
|
"csv",
|
|
"test.csv"
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.records_processed == 5
|
|
assert result.records_created == 5
|
|
assert result.records_failed == 0
|
|
|
|
async def test_process_json_import_success(self, import_service, sample_tenant_id, sample_json_data):
|
|
"""Test successful JSON import processing"""
|
|
with patch('app.services.data_import_service.get_db_transaction') as mock_get_db:
|
|
mock_db = AsyncMock()
|
|
mock_get_db.return_value.__aenter__.return_value = mock_db
|
|
|
|
mock_repository = AsyncMock()
|
|
mock_repository.create_sales_record.return_value = AsyncMock()
|
|
|
|
with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository):
|
|
result = await import_service.process_import(
|
|
sample_tenant_id,
|
|
sample_json_data,
|
|
"json",
|
|
"test.json"
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.records_processed == 2
|
|
assert result.records_created == 2
|
|
|
|
async def test_process_excel_import_base64(self, import_service, sample_tenant_id):
|
|
"""Test Excel import with base64 encoded data"""
|
|
# Create a simple Excel-like data structure
|
|
excel_data = json.dumps([{
|
|
"date": "2024-01-15",
|
|
"product": "Pan Integral",
|
|
"quantity": 5,
|
|
"revenue": 12.50
|
|
}])
|
|
|
|
# Encode as base64
|
|
encoded_data = "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," + \
|
|
base64.b64encode(excel_data.encode()).decode()
|
|
|
|
with patch('app.services.data_import_service.get_db_transaction') as mock_get_db:
|
|
mock_db = AsyncMock()
|
|
mock_get_db.return_value.__aenter__.return_value = mock_db
|
|
|
|
mock_repository = AsyncMock()
|
|
mock_repository.create_sales_record.return_value = AsyncMock()
|
|
|
|
# Mock pandas.read_excel to avoid dependency issues
|
|
with patch('pandas.read_excel') as mock_read_excel:
|
|
import pandas as pd
|
|
mock_df = pd.DataFrame([{
|
|
"date": "2024-01-15",
|
|
"product": "Pan Integral",
|
|
"quantity": 5,
|
|
"revenue": 12.50
|
|
}])
|
|
mock_read_excel.return_value = mock_df
|
|
|
|
with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository):
|
|
result = await import_service.process_import(
|
|
sample_tenant_id,
|
|
encoded_data,
|
|
"excel",
|
|
"test.xlsx"
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.records_created == 1
|
|
|
|
async def test_detect_columns_mapping(self, import_service):
|
|
"""Test column detection and mapping"""
|
|
columns = ["fecha", "producto", "cantidad", "ingresos", "tienda"]
|
|
|
|
mapping = import_service._detect_columns(columns)
|
|
|
|
assert mapping["date"] == "fecha"
|
|
assert mapping["product"] == "producto"
|
|
assert mapping["quantity"] == "cantidad"
|
|
assert mapping["revenue"] == "ingresos"
|
|
assert mapping["location"] == "tienda"
|
|
|
|
async def test_parse_date_multiple_formats(self, import_service):
|
|
"""Test date parsing with different formats"""
|
|
# Test various date formats
|
|
dates_to_test = [
|
|
"2024-01-15",
|
|
"15/01/2024",
|
|
"01/15/2024",
|
|
"15-01-2024",
|
|
"2024/01/15",
|
|
"2024-01-15 10:30:00"
|
|
]
|
|
|
|
for date_str in dates_to_test:
|
|
result = import_service._parse_date(date_str)
|
|
assert result is not None
|
|
assert isinstance(result, datetime)
|
|
|
|
async def test_parse_date_invalid_formats(self, import_service):
|
|
"""Test date parsing with invalid formats"""
|
|
invalid_dates = ["invalid", "not-a-date", "", None, "32/13/2024"]
|
|
|
|
for date_str in invalid_dates:
|
|
result = import_service._parse_date(date_str)
|
|
assert result is None
|
|
|
|
async def test_clean_product_name(self, import_service):
|
|
"""Test product name cleaning"""
|
|
test_cases = [
|
|
(" pan de molde ", "Pan De Molde"),
|
|
("café con leche!!!", "Café Con Leche"),
|
|
("té verde orgánico", "Té Verde Orgánico"),
|
|
("bocadillo de jamón", "Bocadillo De Jamón"),
|
|
("", "Producto sin nombre"),
|
|
(None, "Producto sin nombre")
|
|
]
|
|
|
|
for input_name, expected in test_cases:
|
|
result = import_service._clean_product_name(input_name)
|
|
assert result == expected
|
|
|
|
async def test_parse_row_data_valid(self, import_service):
|
|
"""Test parsing valid row data"""
|
|
row = {
|
|
"fecha": "2024-01-15",
|
|
"producto": "Pan Integral",
|
|
"cantidad": "5",
|
|
"ingresos": "12.50",
|
|
"tienda": "STORE_001"
|
|
}
|
|
|
|
column_mapping = {
|
|
"date": "fecha",
|
|
"product": "producto",
|
|
"quantity": "cantidad",
|
|
"revenue": "ingresos",
|
|
"location": "tienda"
|
|
}
|
|
|
|
result = await import_service._parse_row_data(row, column_mapping, 1)
|
|
|
|
assert result["skip"] is False
|
|
assert result["product_name"] == "Pan Integral"
|
|
assert "inventory_product_id" in result # Should be generated during parsing
|
|
assert result["quantity_sold"] == 5
|
|
assert result["revenue"] == 12.5
|
|
assert result["location_id"] == "STORE_001"
|
|
|
|
async def test_parse_row_data_missing_required(self, import_service):
|
|
"""Test parsing row data with missing required fields"""
|
|
row = {
|
|
"producto": "Pan Integral",
|
|
"cantidad": "5"
|
|
# Missing date
|
|
}
|
|
|
|
column_mapping = {
|
|
"date": "fecha",
|
|
"product": "producto",
|
|
"quantity": "cantidad"
|
|
}
|
|
|
|
result = await import_service._parse_row_data(row, column_mapping, 1)
|
|
|
|
assert result["skip"] is True
|
|
assert len(result["errors"]) > 0
|
|
assert "Missing date" in result["errors"][0]
|
|
|
|
async def test_parse_row_data_invalid_quantity(self, import_service):
|
|
"""Test parsing row data with invalid quantity"""
|
|
row = {
|
|
"fecha": "2024-01-15",
|
|
"producto": "Pan Integral",
|
|
"cantidad": "invalid_quantity"
|
|
}
|
|
|
|
column_mapping = {
|
|
"date": "fecha",
|
|
"product": "producto",
|
|
"quantity": "cantidad"
|
|
}
|
|
|
|
result = await import_service._parse_row_data(row, column_mapping, 1)
|
|
|
|
assert result["skip"] is False # Should not skip, just use default
|
|
assert result["quantity_sold"] == 1 # Default quantity
|
|
assert len(result["warnings"]) > 0
|
|
|
|
async def test_structure_messages(self, import_service):
|
|
"""Test message structuring"""
|
|
messages = [
|
|
"Simple string message",
|
|
{
|
|
"type": "existing_dict",
|
|
"message": "Already structured",
|
|
"code": "TEST_CODE"
|
|
}
|
|
]
|
|
|
|
result = import_service._structure_messages(messages)
|
|
|
|
assert len(result) == 2
|
|
assert result[0]["type"] == "general_message"
|
|
assert result[0]["message"] == "Simple string message"
|
|
assert result[1]["type"] == "existing_dict"
|
|
|
|
async def test_generate_suggestions_valid_file(self, import_service):
|
|
"""Test suggestion generation for valid files"""
|
|
validation_result = SalesValidationResult(
|
|
is_valid=True,
|
|
total_records=50,
|
|
valid_records=50,
|
|
invalid_records=0,
|
|
errors=[],
|
|
warnings=[],
|
|
summary={}
|
|
)
|
|
|
|
suggestions = import_service._generate_suggestions(validation_result, "csv", 0)
|
|
|
|
assert "El archivo está listo para procesamiento" in suggestions
|
|
assert "Se procesarán aproximadamente 50 registros" in suggestions
|
|
|
|
async def test_generate_suggestions_large_file(self, import_service):
|
|
"""Test suggestion generation for large files"""
|
|
validation_result = SalesValidationResult(
|
|
is_valid=True,
|
|
total_records=2000,
|
|
valid_records=2000,
|
|
invalid_records=0,
|
|
errors=[],
|
|
warnings=[],
|
|
summary={}
|
|
)
|
|
|
|
suggestions = import_service._generate_suggestions(validation_result, "csv", 0)
|
|
|
|
assert "Archivo grande: el procesamiento puede tomar varios minutos" in suggestions
|
|
|
|
async def test_import_error_handling(self, import_service, sample_tenant_id):
|
|
"""Test import error handling"""
|
|
# Test with unsupported format
|
|
with pytest.raises(ValueError, match="Unsupported format"):
|
|
await import_service.process_import(
|
|
sample_tenant_id,
|
|
"some data",
|
|
"unsupported_format"
|
|
)
|
|
|
|
async def test_performance_large_import(self, import_service, sample_tenant_id, large_csv_data):
|
|
"""Test performance with large CSV import"""
|
|
with patch('app.services.data_import_service.get_db_transaction') as mock_get_db:
|
|
mock_db = AsyncMock()
|
|
mock_get_db.return_value.__aenter__.return_value = mock_db
|
|
|
|
mock_repository = AsyncMock()
|
|
mock_repository.create_sales_record.return_value = AsyncMock()
|
|
|
|
with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository):
|
|
import time
|
|
start_time = time.time()
|
|
|
|
result = await import_service.process_import(
|
|
sample_tenant_id,
|
|
large_csv_data,
|
|
"csv",
|
|
"large_test.csv"
|
|
)
|
|
|
|
end_time = time.time()
|
|
execution_time = end_time - start_time
|
|
|
|
assert result.success is True
|
|
assert result.records_processed == 1000
|
|
assert execution_time < 10.0 # Should complete in under 10 seconds |