946 lines
41 KiB
Python
946 lines
41 KiB
Python
# ================================================================
|
|
# services/data/app/services/data_import_service.py
|
|
# ================================================================
|
|
"""Data import service for various formats"""
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
import base64
|
|
import openpyxl
|
|
import pandas as pd
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
import structlog
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from app.services.sales_service import SalesService
|
|
from app.schemas.sales import SalesDataCreate
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
class DataImportService:
|
|
"""
|
|
Service for importing sales data from various formats.
|
|
Supports CSV, Excel, JSON, and direct data entry.
|
|
"""
|
|
|
|
# Common column mappings for different languages/formats
|
|
COLUMN_MAPPINGS = {
|
|
# Date columns
|
|
'date': ['date', 'fecha', 'datum', 'data', 'dia'],
|
|
'datetime': ['datetime', 'fecha_hora', 'timestamp'],
|
|
|
|
# Product columns
|
|
'product': ['product', 'producto', 'item', 'articulo', 'nombre', 'name'],
|
|
'product_name': ['product_name', 'nombre_producto', 'item_name'],
|
|
|
|
# Quantity columns
|
|
'quantity': ['quantity', 'cantidad', 'qty', 'units', 'unidades'],
|
|
'quantity_sold': ['quantity_sold', 'cantidad_vendida', 'sold'],
|
|
|
|
# Revenue columns
|
|
'revenue': ['revenue', 'ingresos', 'sales', 'ventas', 'total', 'importe'],
|
|
'price': ['price', 'precio', 'cost', 'coste'],
|
|
|
|
# Location columns
|
|
'location': ['location', 'ubicacion', 'tienda', 'store', 'punto_venta'],
|
|
'location_id': ['location_id', 'store_id', 'tienda_id'],
|
|
}
|
|
|
|
# Date formats to try
|
|
DATE_FORMATS = [
|
|
'%Y-%m-%d', # 2024-01-15
|
|
'%d/%m/%Y', # 15/01/2024
|
|
'%m/%d/%Y', # 01/15/2024
|
|
'%d-%m-%Y', # 15-01-2024
|
|
'%m-%d-%Y', # 01-15-2024
|
|
'%d.%m.%Y', # 15.01.2024
|
|
'%Y/%m/%d', # 2024/01/15
|
|
'%d/%m/%y', # 15/01/24
|
|
'%m/%d/%y', # 01/15/24
|
|
'%Y-%m-%d %H:%M:%S', # 2024-01-15 14:30:00
|
|
'%d/%m/%Y %H:%M', # 15/01/2024 14:30
|
|
]
|
|
|
|
|
|
@staticmethod
|
|
async def process_upload(tenant_id: str, content: str, file_format: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process uploaded data and return complete response structure"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info("Starting data import",
|
|
filename=filename,
|
|
format=file_format,
|
|
tenant_id=tenant_id)
|
|
|
|
# Process the data based on format
|
|
if file_format.lower() == 'csv':
|
|
result = await DataImportService._process_csv_data(tenant_id, content, db, filename)
|
|
elif file_format.lower() == 'json':
|
|
result = await DataImportService._process_json_data(tenant_id, content, db)
|
|
elif file_format.lower() in ['excel', 'xlsx']:
|
|
result = await DataImportService._process_excel_data(tenant_id, content, db, filename)
|
|
else:
|
|
raise ValueError(f"Unsupported format: {file_format}")
|
|
|
|
# Calculate processing time
|
|
end_time = datetime.utcnow()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
|
|
# Convert errors list to structured format if needed
|
|
structured_errors = []
|
|
for error in result.get("errors", []):
|
|
if isinstance(error, str):
|
|
structured_errors.append({
|
|
"row": None,
|
|
"field": None,
|
|
"message": error,
|
|
"type": "general_error"
|
|
})
|
|
else:
|
|
structured_errors.append(error)
|
|
|
|
# Convert warnings list to structured format if needed
|
|
structured_warnings = []
|
|
for warning in result.get("warnings", []):
|
|
if isinstance(warning, str):
|
|
structured_warnings.append({
|
|
"row": None,
|
|
"field": None,
|
|
"message": warning,
|
|
"type": "general_warning"
|
|
})
|
|
else:
|
|
structured_warnings.append(warning)
|
|
|
|
# Calculate derived values
|
|
total_rows = result.get("total_rows", 0)
|
|
records_created = result.get("records_created", 0)
|
|
records_failed = total_rows - records_created - result.get("skipped", 0)
|
|
|
|
# Return complete response structure matching SalesImportResult schema
|
|
complete_response = {
|
|
"success": result.get("success", False),
|
|
"records_processed": total_rows, # ADDED: total rows processed
|
|
"records_created": records_created,
|
|
"records_updated": 0, # ADDED: default to 0 (we don't update, only create)
|
|
"records_failed": records_failed, # ADDED: calculated failed records
|
|
"errors": structured_errors, # FIXED: structured error objects
|
|
"warnings": structured_warnings, # FIXED: structured warning objects
|
|
"processing_time_seconds": processing_time, # ADDED: processing time
|
|
|
|
# Keep existing fields for backward compatibility
|
|
"total_rows": total_rows,
|
|
"skipped": result.get("skipped", 0),
|
|
"success_rate": result.get("success_rate", 0.0),
|
|
"source": file_format,
|
|
"filename": filename,
|
|
"error_count": len(structured_errors)
|
|
}
|
|
|
|
logger.info("Data processing completed",
|
|
records_created=records_created,
|
|
success_rate=complete_response["success_rate"],
|
|
processing_time=processing_time)
|
|
|
|
return complete_response
|
|
|
|
except Exception as e:
|
|
end_time = datetime.utcnow()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
|
|
error_message = f"Import failed: {str(e)}"
|
|
logger.error("Data import failed", error=error_message, tenant_id=tenant_id)
|
|
|
|
# Return error response with complete structure
|
|
return {
|
|
"success": False,
|
|
"records_processed": 0,
|
|
"records_created": 0,
|
|
"records_updated": 0,
|
|
"records_failed": 0,
|
|
"errors": [{
|
|
"row": None,
|
|
"field": None,
|
|
"message": error_message,
|
|
"type": "import_error"
|
|
}],
|
|
"warnings": [],
|
|
"processing_time_seconds": processing_time,
|
|
|
|
# Backward compatibility fields
|
|
"total_rows": 0,
|
|
"skipped": 0,
|
|
"success_rate": 0.0,
|
|
"source": file_format,
|
|
"filename": filename,
|
|
"error_count": 1
|
|
}
|
|
|
|
# Also need to update the _process_csv_data method to return proper structure
|
|
@staticmethod
|
|
async def _process_csv_data(tenant_id: str, csv_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process CSV data with improved error handling and structure"""
|
|
try:
|
|
# Parse CSV
|
|
reader = csv.DictReader(io.StringIO(csv_content))
|
|
rows = list(reader)
|
|
|
|
if not rows:
|
|
return {
|
|
"success": False,
|
|
"total_rows": 0,
|
|
"records_created": 0,
|
|
"skipped": 0,
|
|
"success_rate": 0.0,
|
|
"errors": ["CSV file is empty"],
|
|
"warnings": []
|
|
}
|
|
|
|
# Column mapping
|
|
column_mapping = DataImportService._get_column_mapping(list(rows[0].keys()))
|
|
|
|
records_created = 0
|
|
errors = []
|
|
warnings = []
|
|
skipped = 0
|
|
|
|
logger.info(f"Processing {len(rows)} records from CSV")
|
|
|
|
for index, row in enumerate(rows):
|
|
try:
|
|
# Extract and validate date
|
|
date_str = str(row.get(column_mapping.get('date', ''), '')).strip()
|
|
if not date_str or date_str.lower() in ['nan', 'null', 'none', '']:
|
|
errors.append(f"Fila {index + 1}: Fecha faltante")
|
|
skipped += 1
|
|
continue
|
|
|
|
parsed_date = DataImportService._parse_date(date_str)
|
|
if not parsed_date:
|
|
errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}")
|
|
skipped += 1
|
|
continue
|
|
|
|
# Extract and validate product name
|
|
product_name = str(row.get(column_mapping.get('product', ''), '')).strip()
|
|
if not product_name or product_name.lower() in ['nan', 'null', 'none', '']:
|
|
errors.append(f"Fila {index + 1}: Nombre de producto faltante")
|
|
skipped += 1
|
|
continue
|
|
|
|
# Clean product name
|
|
product_name = DataImportService._clean_product_name(product_name)
|
|
|
|
# Extract and validate quantity
|
|
quantity_raw = row.get(column_mapping.get('quantity', 'cantidad'), 1)
|
|
try:
|
|
quantity = int(float(str(quantity_raw).replace(',', '.')))
|
|
if quantity <= 0:
|
|
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1")
|
|
quantity = 1
|
|
except (ValueError, TypeError):
|
|
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1")
|
|
quantity = 1
|
|
|
|
# Extract revenue (optional)
|
|
revenue_raw = row.get(column_mapping.get('revenue', 'ingresos'), None)
|
|
revenue = None
|
|
if revenue_raw:
|
|
try:
|
|
revenue = float(str(revenue_raw).replace(',', '.'))
|
|
except (ValueError, TypeError):
|
|
revenue = quantity * 1.5 # Default calculation
|
|
else:
|
|
revenue = quantity * 1.5 # Default calculation
|
|
|
|
# Extract location (optional)
|
|
location_id = row.get(column_mapping.get('location', 'ubicacion'), None)
|
|
|
|
# Create sales record
|
|
sales_data = SalesDataCreate(
|
|
tenant_id=tenant_id,
|
|
date=parsed_date, # Use parsed_date instead of date
|
|
product_name=product_name,
|
|
quantity_sold=quantity,
|
|
revenue=revenue,
|
|
location_id=location_id,
|
|
source="csv"
|
|
)
|
|
|
|
await SalesService.create_sales_record(sales_data, db)
|
|
records_created += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Fila {index + 1}: {str(e)}"
|
|
errors.append(error_msg)
|
|
skipped += 1
|
|
logger.warning("Record processing failed", error=error_msg)
|
|
|
|
success_rate = (records_created / len(rows)) * 100 if rows else 0
|
|
|
|
return {
|
|
"success": records_created > 0,
|
|
"total_rows": len(rows),
|
|
"records_created": records_created,
|
|
"skipped": skipped,
|
|
"success_rate": success_rate,
|
|
"errors": errors,
|
|
"warnings": warnings
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("CSV processing failed", error=str(e))
|
|
return {
|
|
"success": False,
|
|
"total_rows": 0,
|
|
"records_created": 0,
|
|
"skipped": 0,
|
|
"success_rate": 0.0,
|
|
"errors": [f"CSV processing error: {str(e)}"],
|
|
"warnings": []
|
|
}
|
|
|
|
@staticmethod
|
|
async def _process_excel(tenant_id: str, excel_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process Excel file"""
|
|
try:
|
|
# Decode base64 content
|
|
if excel_content.startswith('data:'):
|
|
excel_bytes = base64.b64decode(excel_content.split(',')[1])
|
|
else:
|
|
excel_bytes = base64.b64decode(excel_content)
|
|
|
|
# Read Excel file - try first sheet
|
|
try:
|
|
df = pd.read_excel(io.BytesIO(excel_bytes), sheet_name=0)
|
|
except Exception as e:
|
|
# If pandas fails, try openpyxl directly
|
|
workbook = openpyxl.load_workbook(io.BytesIO(excel_bytes))
|
|
sheet = workbook.active
|
|
|
|
# Convert to DataFrame
|
|
data = []
|
|
headers = None
|
|
for row in sheet.iter_rows(values_only=True):
|
|
if headers is None:
|
|
headers = [str(cell).strip().lower() if cell else f"col_{i}" for i, cell in enumerate(row)]
|
|
else:
|
|
data.append(row)
|
|
|
|
df = pd.DataFrame(data, columns=headers)
|
|
|
|
# Clean column names
|
|
df.columns = df.columns.str.strip().str.lower()
|
|
|
|
# Remove empty rows
|
|
df = df.dropna(how='all')
|
|
|
|
# Map columns
|
|
column_mapping = DataImportService._detect_columns(df.columns.tolist())
|
|
|
|
if not column_mapping.get('date') or not column_mapping.get('product'):
|
|
return {
|
|
"success": False,
|
|
"error": f"Columnas requeridas no encontradas en Excel. Detectadas: {list(df.columns)}"
|
|
}
|
|
|
|
return await DataImportService._process_dataframe(
|
|
tenant_id, df, column_mapping, db, "excel", filename
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Excel processing failed", error=str(e))
|
|
return {"success": False, "error": f"Error procesando Excel: {str(e)}"}
|
|
|
|
@staticmethod
|
|
async def _process_json(tenant_id: str, json_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process JSON file"""
|
|
try:
|
|
# Parse JSON
|
|
if json_content.startswith('data:'):
|
|
json_content = base64.b64decode(json_content.split(',')[1]).decode('utf-8')
|
|
|
|
data = json.loads(json_content)
|
|
|
|
# Handle different JSON structures
|
|
if isinstance(data, dict):
|
|
if 'data' in data:
|
|
records = data['data']
|
|
elif 'records' in data:
|
|
records = data['records']
|
|
elif 'sales' in data:
|
|
records = data['sales']
|
|
else:
|
|
records = [data] # Single record
|
|
elif isinstance(data, list):
|
|
records = data
|
|
else:
|
|
return {"success": False, "error": "Formato JSON no válido"}
|
|
|
|
# Convert to DataFrame for consistent processing
|
|
df = pd.DataFrame(records)
|
|
df.columns = df.columns.str.strip().str.lower()
|
|
|
|
# Map columns
|
|
column_mapping = DataImportService._detect_columns(df.columns.tolist())
|
|
|
|
if not column_mapping.get('date') or not column_mapping.get('product'):
|
|
return {
|
|
"success": False,
|
|
"error": f"Columnas requeridas no encontradas en JSON. Detectadas: {list(df.columns)}"
|
|
}
|
|
|
|
return await DataImportService._process_dataframe(
|
|
tenant_id, df, column_mapping, db, "json", filename
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
return {"success": False, "error": f"JSON inválido: {str(e)}"}
|
|
except Exception as e:
|
|
logger.error("JSON processing failed", error=str(e))
|
|
return {"success": False, "error": f"Error procesando JSON: {str(e)}"}
|
|
|
|
@staticmethod
|
|
async def _process_pos_data(tenant_id: str, pos_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process POS (Point of Sale) system data"""
|
|
try:
|
|
# POS data often comes in specific formats
|
|
# This is a generic parser that can be customized for specific POS systems
|
|
|
|
if pos_content.startswith('data:'):
|
|
pos_content = base64.b64decode(pos_content.split(',')[1]).decode('utf-8')
|
|
|
|
lines = pos_content.strip().split('\n')
|
|
records = []
|
|
|
|
for line_num, line in enumerate(lines, 1):
|
|
try:
|
|
# Skip empty lines and headers
|
|
if not line.strip() or line.startswith('#') or 'TOTAL' in line.upper():
|
|
continue
|
|
|
|
# Try different delimiters
|
|
for delimiter in ['\t', ';', '|', ',']:
|
|
if delimiter in line:
|
|
parts = line.split(delimiter)
|
|
if len(parts) >= 3: # At least date, product, quantity
|
|
records.append({
|
|
'date': parts[0].strip(),
|
|
'product': parts[1].strip(),
|
|
'quantity': parts[2].strip(),
|
|
'revenue': parts[3].strip() if len(parts) > 3 else None,
|
|
'line_number': line_num
|
|
})
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Skipping POS line {line_num}: {e}")
|
|
continue
|
|
|
|
if not records:
|
|
return {"success": False, "error": "No se encontraron datos válidos en el archivo POS"}
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(records)
|
|
|
|
# Standard column mapping for POS
|
|
column_mapping = {
|
|
'date': 'date',
|
|
'product': 'product',
|
|
'quantity': 'quantity',
|
|
'revenue': 'revenue'
|
|
}
|
|
|
|
return await DataImportService._process_dataframe(
|
|
tenant_id, df, column_mapping, db, "pos", filename
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("POS processing failed", error=str(e))
|
|
return {"success": False, "error": f"Error procesando datos POS: {str(e)}"}
|
|
|
|
@staticmethod
|
|
async def _process_dataframe(tenant_id: str,
|
|
df: pd.DataFrame,
|
|
column_mapping: Dict[str, str],
|
|
db: AsyncSession,
|
|
source: str,
|
|
filename: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Process DataFrame with mapped columns"""
|
|
try:
|
|
records_created = 0
|
|
errors = []
|
|
warnings = []
|
|
skipped = 0
|
|
|
|
logger.info(f"Processing {len(df)} records from {source}")
|
|
|
|
for index, row in df.iterrows():
|
|
try:
|
|
# Extract and validate date
|
|
date_str = str(row.get(column_mapping['date'], '')).strip()
|
|
if not date_str or date_str.lower() in ['nan', 'null', 'none', '']:
|
|
errors.append(f"Fila {index + 1}: Fecha faltante")
|
|
skipped += 1
|
|
continue
|
|
|
|
date = DataImportService._parse_date(date_str)
|
|
if not date:
|
|
errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}")
|
|
skipped += 1
|
|
continue
|
|
|
|
# Extract and validate product name
|
|
product_name = str(row.get(column_mapping['product'], '')).strip()
|
|
if not product_name or product_name.lower() in ['nan', 'null', 'none', '']:
|
|
errors.append(f"Fila {index + 1}: Nombre de producto faltante")
|
|
skipped += 1
|
|
continue
|
|
|
|
# Clean product name
|
|
product_name = DataImportService._clean_product_name(product_name)
|
|
|
|
# Extract and validate quantity
|
|
quantity_raw = row.get(column_mapping.get('quantity', 'quantity'), 0)
|
|
try:
|
|
quantity = int(float(str(quantity_raw).replace(',', '.')))
|
|
if quantity <= 0:
|
|
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1")
|
|
quantity = 1
|
|
except (ValueError, TypeError):
|
|
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1")
|
|
quantity = 1
|
|
|
|
# Extract revenue (optional)
|
|
revenue = None
|
|
if 'revenue' in column_mapping and column_mapping['revenue'] in row:
|
|
revenue_raw = row.get(column_mapping['revenue'])
|
|
if revenue_raw and str(revenue_raw).lower() not in ['nan', 'null', 'none', '']:
|
|
try:
|
|
revenue = float(str(revenue_raw).replace(',', '.').replace('€', '').replace('$', '').strip())
|
|
if revenue < 0:
|
|
revenue = None
|
|
warnings.append(f"Fila {index + 1}: Ingreso negativo ignorado")
|
|
except (ValueError, TypeError):
|
|
warnings.append(f"Fila {index + 1}: Formato de ingreso inválido: {revenue_raw}")
|
|
|
|
# Extract location (optional)
|
|
location_id = None
|
|
if 'location' in column_mapping and column_mapping['location'] in row:
|
|
location_raw = row.get(column_mapping['location'])
|
|
if location_raw and str(location_raw).lower() not in ['nan', 'null', 'none', '']:
|
|
location_id = str(location_raw).strip()
|
|
|
|
# Create sales record
|
|
sales_data = SalesDataCreate(
|
|
tenant_id=tenant_id,
|
|
date=date,
|
|
product_name=product_name,
|
|
quantity_sold=quantity,
|
|
revenue=revenue,
|
|
location_id=location_id,
|
|
source=source,
|
|
raw_data=json.dumps({
|
|
**row.to_dict(),
|
|
"original_row": index + 1,
|
|
"filename": filename
|
|
})
|
|
)
|
|
|
|
await SalesService.create_sales_record(sales_data, db)
|
|
records_created += 1
|
|
|
|
# Log progress for large imports
|
|
if records_created % 100 == 0:
|
|
logger.info(f"Processed {records_created} records...")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Fila {index + 1}: {str(e)}"
|
|
errors.append(error_msg)
|
|
logger.warning("Record processing failed", error=error_msg)
|
|
continue
|
|
|
|
# Calculate success rate
|
|
total_processed = records_created + skipped
|
|
success_rate = (records_created / len(df)) * 100 if len(df) > 0 else 0
|
|
|
|
result = {
|
|
"success": True,
|
|
"records_created": records_created,
|
|
"total_rows": len(df),
|
|
"skipped": skipped,
|
|
"success_rate": round(success_rate, 1),
|
|
"errors": errors[:10], # Limit to first 10 errors
|
|
"warnings": warnings[:10], # Limit to first 10 warnings
|
|
"source": source,
|
|
"filename": filename
|
|
}
|
|
|
|
if errors:
|
|
result["error_count"] = len(errors)
|
|
if len(errors) > 10:
|
|
result["errors"].append(f"... y {len(errors) - 10} errores más")
|
|
|
|
if warnings:
|
|
result["warning_count"] = len(warnings)
|
|
if len(warnings) > 10:
|
|
result["warnings"].append(f"... y {len(warnings) - 10} advertencias más")
|
|
|
|
logger.info("Data processing completed",
|
|
records_created=records_created,
|
|
total_rows=len(df),
|
|
success_rate=success_rate)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error("DataFrame processing failed", error=str(e))
|
|
return {
|
|
"success": False,
|
|
"error": f"Error procesando datos: {str(e)}",
|
|
"records_created": 0
|
|
}
|
|
|
|
@staticmethod
|
|
def _detect_columns(columns: List[str]) -> Dict[str, str]:
|
|
"""Detect column mappings using fuzzy matching"""
|
|
mapping = {}
|
|
columns_lower = [col.lower() for col in columns]
|
|
|
|
for standard_name, possible_names in DataImportService.COLUMN_MAPPINGS.items():
|
|
for col in columns_lower:
|
|
for possible in possible_names:
|
|
if possible in col or col in possible:
|
|
mapping[standard_name] = columns[columns_lower.index(col)]
|
|
break
|
|
if standard_name in mapping:
|
|
break
|
|
|
|
# Map common aliases
|
|
if 'product' not in mapping and 'product_name' in mapping:
|
|
mapping['product'] = mapping['product_name']
|
|
if 'quantity' not in mapping and 'quantity_sold' in mapping:
|
|
mapping['quantity'] = mapping['quantity_sold']
|
|
if 'location' not in mapping and 'location_id' in mapping:
|
|
mapping['location'] = mapping['location_id']
|
|
|
|
return mapping
|
|
|
|
@staticmethod
|
|
def _parse_date(date_str: str) -> Optional[datetime]:
|
|
"""Parse date string with multiple format attempts"""
|
|
if not date_str or str(date_str).lower() in ['nan', 'null', 'none']:
|
|
return None
|
|
|
|
# Clean date string
|
|
date_str = str(date_str).strip()
|
|
|
|
# Try pandas first (handles most formats automatically)
|
|
try:
|
|
return pd.to_datetime(date_str, dayfirst=True)
|
|
except:
|
|
pass
|
|
|
|
# Try specific formats
|
|
for fmt in DataImportService.DATE_FORMATS:
|
|
try:
|
|
return datetime.strptime(date_str, fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
# Try extracting numbers and common patterns
|
|
try:
|
|
# Look for patterns like dd/mm/yyyy or dd-mm-yyyy
|
|
date_pattern = re.search(r'(\d{1,2})[/\-.](\d{1,2})[/\-.](\d{2,4})', date_str)
|
|
if date_pattern:
|
|
day, month, year = date_pattern.groups()
|
|
|
|
# Convert 2-digit year to 4-digit
|
|
year = int(year)
|
|
if year < 50:
|
|
year += 2000
|
|
elif year < 100:
|
|
year += 1900
|
|
|
|
return datetime(year, int(month), int(day))
|
|
except:
|
|
pass
|
|
|
|
logger.warning(f"Could not parse date: {date_str}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def _clean_product_name(product_name: str) -> str:
|
|
"""Clean and standardize product names"""
|
|
if not product_name:
|
|
return "Producto sin nombre"
|
|
|
|
# Remove extra whitespace
|
|
cleaned = re.sub(r'\s+', ' ', str(product_name).strip())
|
|
|
|
# Remove special characters but keep Spanish characters
|
|
cleaned = re.sub(r'[^\w\s\-áéíóúñçüÁÉÍÓÚÑÇÜ]', '', cleaned)
|
|
|
|
# Capitalize first letter of each word
|
|
cleaned = cleaned.title()
|
|
|
|
# Common product name corrections for Spanish bakeries
|
|
replacements = {
|
|
'Pan De': 'Pan de',
|
|
'Café Con': 'Café con',
|
|
'Te ': 'Té ',
|
|
'Bocadillo De': 'Bocadillo de',
|
|
}
|
|
|
|
for old, new in replacements.items():
|
|
cleaned = cleaned.replace(old, new)
|
|
|
|
return cleaned if cleaned else "Producto sin nombre"
|
|
|
|
@staticmethod
|
|
async def validate_import_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
✅ FINAL FIX: Validate import data before processing
|
|
Returns response matching SalesValidationResult schema EXACTLY
|
|
"""
|
|
logger.info("Starting import data validation", tenant_id=data.get("tenant_id"))
|
|
|
|
# Initialize validation result with all required fields matching schema
|
|
validation_result = {
|
|
"is_valid": True, # ✅ CORRECT: matches schema
|
|
"total_records": 0, # ✅ REQUIRED: int field
|
|
"valid_records": 0, # ✅ REQUIRED: int field
|
|
"invalid_records": 0, # ✅ REQUIRED: int field
|
|
"errors": [], # ✅ REQUIRED: List[Dict[str, Any]]
|
|
"warnings": [], # ✅ REQUIRED: List[Dict[str, Any]]
|
|
"summary": {} # ✅ REQUIRED: Dict[str, Any]
|
|
}
|
|
|
|
error_list = []
|
|
warning_list = []
|
|
|
|
try:
|
|
# Basic validation checks
|
|
if not data.get("tenant_id"):
|
|
error_list.append("tenant_id es requerido")
|
|
validation_result["is_valid"] = False
|
|
|
|
if not data.get("data"):
|
|
error_list.append("Datos de archivo faltantes")
|
|
validation_result["is_valid"] = False
|
|
|
|
# Early return for missing data
|
|
validation_result["errors"] = [
|
|
{"type": "missing_data", "message": msg, "field": "data", "row": None}
|
|
for msg in error_list
|
|
]
|
|
validation_result["summary"] = {
|
|
"status": "failed",
|
|
"reason": "no_data_provided",
|
|
"file_format": data.get("data_format", "unknown"),
|
|
"suggestions": ["Selecciona un archivo válido para importar"]
|
|
}
|
|
logger.warning("Validation failed: no data provided")
|
|
return validation_result
|
|
|
|
# Validate file format
|
|
format_type = data.get("data_format", "").lower()
|
|
supported_formats = ["csv", "excel", "xlsx", "xls", "json", "pos"]
|
|
|
|
if format_type not in supported_formats:
|
|
error_list.append(f"Formato no soportado: {format_type}")
|
|
validation_result["is_valid"] = False
|
|
|
|
# Validate data size
|
|
data_content = data.get("data", "")
|
|
data_size = len(data_content)
|
|
|
|
if data_size == 0:
|
|
error_list.append("El archivo está vacío")
|
|
validation_result["is_valid"] = False
|
|
elif data_size > 10 * 1024 * 1024: # 10MB limit
|
|
error_list.append("Archivo demasiado grande (máximo 10MB)")
|
|
validation_result["is_valid"] = False
|
|
elif data_size > 1024 * 1024: # 1MB warning
|
|
warning_list.append("Archivo grande detectado. El procesamiento puede tomar más tiempo.")
|
|
|
|
# ✅ ENHANCED: Try to parse and analyze the actual content
|
|
if format_type == "csv" and data_content and validation_result["is_valid"]:
|
|
try:
|
|
import csv
|
|
import io
|
|
|
|
# Parse CSV and analyze content
|
|
reader = csv.DictReader(io.StringIO(data_content))
|
|
rows = list(reader)
|
|
|
|
validation_result["total_records"] = len(rows)
|
|
|
|
if not rows:
|
|
error_list.append("El archivo CSV no contiene datos")
|
|
validation_result["is_valid"] = False
|
|
else:
|
|
# Analyze CSV structure
|
|
headers = list(rows[0].keys()) if rows else []
|
|
logger.debug(f"CSV headers found: {headers}")
|
|
|
|
# Check for required columns (flexible mapping)
|
|
has_date = any(col.lower() in ['fecha', 'date', 'día', 'day'] for col in headers)
|
|
has_product = any(col.lower() in ['producto', 'product', 'product_name', 'item'] for col in headers)
|
|
has_quantity = any(col.lower() in ['cantidad', 'quantity', 'qty', 'units'] for col in headers)
|
|
|
|
missing_columns = []
|
|
if not has_date:
|
|
missing_columns.append("fecha/date")
|
|
if not has_product:
|
|
missing_columns.append("producto/product")
|
|
if not has_quantity:
|
|
warning_list.append("Columna de cantidad no encontrada, se usará 1 por defecto")
|
|
|
|
if missing_columns:
|
|
error_list.append(f"Columnas requeridas faltantes: {', '.join(missing_columns)}")
|
|
validation_result["is_valid"] = False
|
|
|
|
# Sample data validation (check first few rows)
|
|
sample_errors = 0
|
|
for i, row in enumerate(rows[:5]): # Check first 5 rows
|
|
if not any(row.get(col) for col in headers if 'fecha' in col.lower() or 'date' in col.lower()):
|
|
sample_errors += 1
|
|
if not any(row.get(col) for col in headers if 'producto' in col.lower() or 'product' in col.lower()):
|
|
sample_errors += 1
|
|
|
|
if sample_errors > 0:
|
|
warning_list.append(f"Se detectaron {sample_errors} filas con datos faltantes en la muestra")
|
|
|
|
# Calculate estimated valid/invalid records
|
|
if validation_result["is_valid"]:
|
|
estimated_invalid = max(0, int(validation_result["total_records"] * 0.1)) # Assume 10% might have issues
|
|
validation_result["valid_records"] = validation_result["total_records"] - estimated_invalid
|
|
validation_result["invalid_records"] = estimated_invalid
|
|
else:
|
|
validation_result["valid_records"] = 0
|
|
validation_result["invalid_records"] = validation_result["total_records"]
|
|
|
|
except Exception as csv_error:
|
|
logger.warning(f"CSV analysis failed: {str(csv_error)}")
|
|
warning_list.append(f"No se pudo analizar completamente el CSV: {str(csv_error)}")
|
|
# Don't fail validation just because of analysis issues
|
|
|
|
# ✅ CRITICAL: Convert string messages to required Dict structure
|
|
validation_result["errors"] = [
|
|
{
|
|
"type": "validation_error",
|
|
"message": msg,
|
|
"field": None,
|
|
"row": None,
|
|
"code": "VALIDATION_ERROR"
|
|
}
|
|
for msg in error_list
|
|
]
|
|
|
|
validation_result["warnings"] = [
|
|
{
|
|
"type": "validation_warning",
|
|
"message": msg,
|
|
"field": None,
|
|
"row": None,
|
|
"code": "VALIDATION_WARNING"
|
|
}
|
|
for msg in warning_list
|
|
]
|
|
|
|
# ✅ CRITICAL: Build comprehensive summary Dict
|
|
validation_result["summary"] = {
|
|
"status": "valid" if validation_result["is_valid"] else "invalid",
|
|
"file_format": format_type,
|
|
"file_size_bytes": data_size,
|
|
"file_size_mb": round(data_size / (1024 * 1024), 2),
|
|
"estimated_processing_time_seconds": max(1, validation_result["total_records"] // 100),
|
|
"validation_timestamp": datetime.utcnow().isoformat(),
|
|
"suggestions": []
|
|
}
|
|
|
|
# Add contextual suggestions
|
|
if validation_result["is_valid"]:
|
|
validation_result["summary"]["suggestions"] = [
|
|
"El archivo está listo para procesamiento",
|
|
f"Se procesarán aproximadamente {validation_result['total_records']} registros"
|
|
]
|
|
if validation_result["total_records"] > 1000:
|
|
validation_result["summary"]["suggestions"].append("Archivo grande: el procesamiento puede tomar varios minutos")
|
|
if len(warning_list) > 0:
|
|
validation_result["summary"]["suggestions"].append("Revisa las advertencias antes de continuar")
|
|
else:
|
|
validation_result["summary"]["suggestions"] = [
|
|
"Corrige los errores antes de continuar",
|
|
"Verifica que el archivo tenga el formato correcto"
|
|
]
|
|
if format_type not in supported_formats:
|
|
validation_result["summary"]["suggestions"].append("Usa formato CSV o Excel")
|
|
if validation_result["total_records"] == 0:
|
|
validation_result["summary"]["suggestions"].append("Asegúrate de que el archivo contenga datos")
|
|
|
|
logger.info("Import validation completed",
|
|
is_valid=validation_result["is_valid"],
|
|
total_records=validation_result["total_records"],
|
|
valid_records=validation_result["valid_records"],
|
|
invalid_records=validation_result["invalid_records"],
|
|
error_count=len(validation_result["errors"]),
|
|
warning_count=len(validation_result["warnings"]))
|
|
|
|
return validation_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Validation process failed: {str(e)}")
|
|
|
|
# Return properly structured error response
|
|
return {
|
|
"is_valid": False,
|
|
"total_records": 0,
|
|
"valid_records": 0,
|
|
"invalid_records": 0,
|
|
"errors": [
|
|
{
|
|
"type": "system_error",
|
|
"message": f"Error en el proceso de validación: {str(e)}",
|
|
"field": None,
|
|
"row": None,
|
|
"code": "SYSTEM_ERROR"
|
|
}
|
|
],
|
|
"warnings": [],
|
|
"summary": {
|
|
"status": "error",
|
|
"file_format": data.get("data_format", "unknown"),
|
|
"error_type": "system_error",
|
|
"suggestions": [
|
|
"Intenta de nuevo con un archivo diferente",
|
|
"Contacta soporte si el problema persiste"
|
|
]
|
|
}
|
|
}
|
|
|
|
@staticmethod
|
|
def _get_column_mapping(columns: List[str]) -> Dict[str, str]:
|
|
"""Get column mapping - alias for _detect_columns"""
|
|
return DataImportService._detect_columns(columns)
|
|
|
|
@staticmethod
|
|
def _clean_product_name(product_name: str) -> str:
|
|
"""Clean and normalize product name"""
|
|
if not product_name:
|
|
return ""
|
|
|
|
# Basic cleaning
|
|
cleaned = str(product_name).strip().lower()
|
|
|
|
# Remove extra whitespace
|
|
import re
|
|
cleaned = re.sub(r'\s+', ' ', cleaned)
|
|
|
|
return cleaned |