# services/sales/tests/unit/test_data_import.py """ Unit tests for Data Import Service """ import pytest import json import base64 from decimal import Decimal from datetime import datetime, timezone from unittest.mock import AsyncMock, patch from app.services.data_import_service import DataImportService, SalesValidationResult, SalesImportResult @pytest.mark.asyncio class TestDataImportService: """Test Data Import Service functionality""" @pytest.fixture def import_service(self): """Create data import service instance""" return DataImportService() async def test_validate_csv_import_data_valid(self, import_service, sample_tenant_id, sample_csv_data): """Test validation of valid CSV import data""" data = { "tenant_id": str(sample_tenant_id), "data": sample_csv_data, "data_format": "csv" } result = await import_service.validate_import_data(data) assert result.is_valid is True assert result.total_records == 5 assert len(result.errors) == 0 assert result.summary["status"] == "valid" async def test_validate_csv_import_data_missing_tenant(self, import_service, sample_csv_data): """Test validation with missing tenant_id""" data = { "data": sample_csv_data, "data_format": "csv" } result = await import_service.validate_import_data(data) assert result.is_valid is False assert any(error["code"] == "MISSING_TENANT_ID" for error in result.errors) async def test_validate_csv_import_data_empty_file(self, import_service, sample_tenant_id): """Test validation with empty file""" data = { "tenant_id": str(sample_tenant_id), "data": "", "data_format": "csv" } result = await import_service.validate_import_data(data) assert result.is_valid is False assert any(error["code"] == "EMPTY_FILE" for error in result.errors) async def test_validate_csv_import_data_unsupported_format(self, import_service, sample_tenant_id): """Test validation with unsupported format""" data = { "tenant_id": str(sample_tenant_id), "data": "some data", "data_format": "unsupported" } result = await import_service.validate_import_data(data) assert result.is_valid is False assert any(error["code"] == "UNSUPPORTED_FORMAT" for error in result.errors) async def test_validate_csv_missing_required_columns(self, import_service, sample_tenant_id): """Test validation with missing required columns""" invalid_csv = "invalid_column,another_invalid\nvalue1,value2" data = { "tenant_id": str(sample_tenant_id), "data": invalid_csv, "data_format": "csv" } result = await import_service.validate_import_data(data) assert result.is_valid is False assert any(error["code"] == "MISSING_DATE_COLUMN" for error in result.errors) assert any(error["code"] == "MISSING_PRODUCT_COLUMN" for error in result.errors) async def test_process_csv_import_success(self, import_service, sample_tenant_id, sample_csv_data): """Test successful CSV import processing""" with patch('app.services.data_import_service.get_db_transaction') as mock_get_db: mock_db = AsyncMock() mock_get_db.return_value.__aenter__.return_value = mock_db mock_repository = AsyncMock() mock_repository.create_sales_record.return_value = AsyncMock() with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository): result = await import_service.process_import( sample_tenant_id, sample_csv_data, "csv", "test.csv" ) assert result.success is True assert result.records_processed == 5 assert result.records_created == 5 assert result.records_failed == 0 async def test_process_json_import_success(self, import_service, sample_tenant_id, sample_json_data): """Test successful JSON import processing""" with patch('app.services.data_import_service.get_db_transaction') as mock_get_db: mock_db = AsyncMock() mock_get_db.return_value.__aenter__.return_value = mock_db mock_repository = AsyncMock() mock_repository.create_sales_record.return_value = AsyncMock() with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository): result = await import_service.process_import( sample_tenant_id, sample_json_data, "json", "test.json" ) assert result.success is True assert result.records_processed == 2 assert result.records_created == 2 async def test_process_excel_import_base64(self, import_service, sample_tenant_id): """Test Excel import with base64 encoded data""" # Create a simple Excel-like data structure excel_data = json.dumps([{ "date": "2024-01-15", "product": "Pan Integral", "quantity": 5, "revenue": 12.50 }]) # Encode as base64 encoded_data = "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," + \ base64.b64encode(excel_data.encode()).decode() with patch('app.services.data_import_service.get_db_transaction') as mock_get_db: mock_db = AsyncMock() mock_get_db.return_value.__aenter__.return_value = mock_db mock_repository = AsyncMock() mock_repository.create_sales_record.return_value = AsyncMock() # Mock pandas.read_excel to avoid dependency issues with patch('pandas.read_excel') as mock_read_excel: import pandas as pd mock_df = pd.DataFrame([{ "date": "2024-01-15", "product": "Pan Integral", "quantity": 5, "revenue": 12.50 }]) mock_read_excel.return_value = mock_df with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository): result = await import_service.process_import( sample_tenant_id, encoded_data, "excel", "test.xlsx" ) assert result.success is True assert result.records_created == 1 async def test_detect_columns_mapping(self, import_service): """Test column detection and mapping""" columns = ["fecha", "producto", "cantidad", "ingresos", "tienda"] mapping = import_service._detect_columns(columns) assert mapping["date"] == "fecha" assert mapping["product"] == "producto" assert mapping["quantity"] == "cantidad" assert mapping["revenue"] == "ingresos" assert mapping["location"] == "tienda" async def test_parse_date_multiple_formats(self, import_service): """Test date parsing with different formats""" # Test various date formats dates_to_test = [ "2024-01-15", "15/01/2024", "01/15/2024", "15-01-2024", "2024/01/15", "2024-01-15 10:30:00" ] for date_str in dates_to_test: result = import_service._parse_date(date_str) assert result is not None assert isinstance(result, datetime) async def test_parse_date_invalid_formats(self, import_service): """Test date parsing with invalid formats""" invalid_dates = ["invalid", "not-a-date", "", None, "32/13/2024"] for date_str in invalid_dates: result = import_service._parse_date(date_str) assert result is None async def test_clean_product_name(self, import_service): """Test product name cleaning""" test_cases = [ (" pan de molde ", "Pan De Molde"), ("café con leche!!!", "Café Con Leche"), ("té verde orgánico", "Té Verde Orgánico"), ("bocadillo de jamón", "Bocadillo De Jamón"), ("", "Producto sin nombre"), (None, "Producto sin nombre") ] for input_name, expected in test_cases: result = import_service._clean_product_name(input_name) assert result == expected async def test_parse_row_data_valid(self, import_service): """Test parsing valid row data""" row = { "fecha": "2024-01-15", "producto": "Pan Integral", "cantidad": "5", "ingresos": "12.50", "tienda": "STORE_001" } column_mapping = { "date": "fecha", "product": "producto", "quantity": "cantidad", "revenue": "ingresos", "location": "tienda" } result = await import_service._parse_row_data(row, column_mapping, 1) assert result["skip"] is False assert result["product_name"] == "Pan Integral" assert "inventory_product_id" in result # Should be generated during parsing assert result["quantity_sold"] == 5 assert result["revenue"] == 12.5 assert result["location_id"] == "STORE_001" async def test_parse_row_data_missing_required(self, import_service): """Test parsing row data with missing required fields""" row = { "producto": "Pan Integral", "cantidad": "5" # Missing date } column_mapping = { "date": "fecha", "product": "producto", "quantity": "cantidad" } result = await import_service._parse_row_data(row, column_mapping, 1) assert result["skip"] is True assert len(result["errors"]) > 0 assert "Missing date" in result["errors"][0] async def test_parse_row_data_invalid_quantity(self, import_service): """Test parsing row data with invalid quantity""" row = { "fecha": "2024-01-15", "producto": "Pan Integral", "cantidad": "invalid_quantity" } column_mapping = { "date": "fecha", "product": "producto", "quantity": "cantidad" } result = await import_service._parse_row_data(row, column_mapping, 1) assert result["skip"] is False # Should not skip, just use default assert result["quantity_sold"] == 1 # Default quantity assert len(result["warnings"]) > 0 async def test_structure_messages(self, import_service): """Test message structuring""" messages = [ "Simple string message", { "type": "existing_dict", "message": "Already structured", "code": "TEST_CODE" } ] result = import_service._structure_messages(messages) assert len(result) == 2 assert result[0]["type"] == "general_message" assert result[0]["message"] == "Simple string message" assert result[1]["type"] == "existing_dict" async def test_generate_suggestions_valid_file(self, import_service): """Test suggestion generation for valid files""" validation_result = SalesValidationResult( is_valid=True, total_records=50, valid_records=50, invalid_records=0, errors=[], warnings=[], summary={} ) suggestions = import_service._generate_suggestions(validation_result, "csv", 0) assert "El archivo está listo para procesamiento" in suggestions assert "Se procesarán aproximadamente 50 registros" in suggestions async def test_generate_suggestions_large_file(self, import_service): """Test suggestion generation for large files""" validation_result = SalesValidationResult( is_valid=True, total_records=2000, valid_records=2000, invalid_records=0, errors=[], warnings=[], summary={} ) suggestions = import_service._generate_suggestions(validation_result, "csv", 0) assert "Archivo grande: el procesamiento puede tomar varios minutos" in suggestions async def test_import_error_handling(self, import_service, sample_tenant_id): """Test import error handling""" # Test with unsupported format with pytest.raises(ValueError, match="Unsupported format"): await import_service.process_import( sample_tenant_id, "some data", "unsupported_format" ) async def test_performance_large_import(self, import_service, sample_tenant_id, large_csv_data): """Test performance with large CSV import""" with patch('app.services.data_import_service.get_db_transaction') as mock_get_db: mock_db = AsyncMock() mock_get_db.return_value.__aenter__.return_value = mock_db mock_repository = AsyncMock() mock_repository.create_sales_record.return_value = AsyncMock() with patch('app.services.data_import_service.SalesRepository', return_value=mock_repository): import time start_time = time.time() result = await import_service.process_import( sample_tenant_id, large_csv_data, "csv", "large_test.csv" ) end_time = time.time() execution_time = end_time - start_time assert result.success is True assert result.records_processed == 1000 assert execution_time < 10.0 # Should complete in under 10 seconds