Files
bakery-ia/services/data/app/services/data_import_service.py
2025-07-19 12:51:28 +02:00

831 lines
34 KiB
Python

# ================================================================
# services/data/app/services/data_import_service.py
# ================================================================
"""Data import service for various formats"""
import csv
import io
import json
import base64
import openpyxl
import pandas as pd
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
import re
from pathlib import Path
from app.services.sales_service import SalesService
from app.schemas.sales import SalesDataCreate
logger = structlog.get_logger()
class DataImportService:
"""
Service for importing sales data from various formats.
Supports CSV, Excel, JSON, and direct data entry.
"""
# Common column mappings for different languages/formats
COLUMN_MAPPINGS = {
# Date columns
'date': ['date', 'fecha', 'datum', 'data', 'dia'],
'datetime': ['datetime', 'fecha_hora', 'timestamp'],
# Product columns
'product': ['product', 'producto', 'item', 'articulo', 'nombre', 'name'],
'product_name': ['product_name', 'nombre_producto', 'item_name'],
# Quantity columns
'quantity': ['quantity', 'cantidad', 'qty', 'units', 'unidades'],
'quantity_sold': ['quantity_sold', 'cantidad_vendida', 'sold'],
# Revenue columns
'revenue': ['revenue', 'ingresos', 'sales', 'ventas', 'total', 'importe'],
'price': ['price', 'precio', 'cost', 'coste'],
# Location columns
'location': ['location', 'ubicacion', 'tienda', 'store', 'punto_venta'],
'location_id': ['location_id', 'store_id', 'tienda_id'],
}
# Date formats to try
DATE_FORMATS = [
'%Y-%m-%d', # 2024-01-15
'%d/%m/%Y', # 15/01/2024
'%m/%d/%Y', # 01/15/2024
'%d-%m-%Y', # 15-01-2024
'%m-%d-%Y', # 01-15-2024
'%d.%m.%Y', # 15.01.2024
'%Y/%m/%d', # 2024/01/15
'%d/%m/%y', # 15/01/24
'%m/%d/%y', # 01/15/24
'%Y-%m-%d %H:%M:%S', # 2024-01-15 14:30:00
'%d/%m/%Y %H:%M', # 15/01/2024 14:30
]
@staticmethod
async def process_upload(tenant_id: str, content: str, file_format: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
"""Process uploaded data and return complete response structure"""
start_time = datetime.utcnow()
try:
logger.info("Starting data import",
filename=filename,
format=file_format,
tenant_id=tenant_id)
# Process the data based on format
if file_format.lower() == 'csv':
result = await DataImportService._process_csv_data(tenant_id, content, db, filename)
elif file_format.lower() == 'json':
result = await DataImportService._process_json_data(tenant_id, content, db)
elif file_format.lower() in ['excel', 'xlsx']:
result = await DataImportService._process_excel_data(tenant_id, content, db, filename)
else:
raise ValueError(f"Unsupported format: {file_format}")
# Calculate processing time
end_time = datetime.utcnow()
processing_time = (end_time - start_time).total_seconds()
# Convert errors list to structured format if needed
structured_errors = []
for error in result.get("errors", []):
if isinstance(error, str):
structured_errors.append({
"row": None,
"field": None,
"message": error,
"type": "general_error"
})
else:
structured_errors.append(error)
# Convert warnings list to structured format if needed
structured_warnings = []
for warning in result.get("warnings", []):
if isinstance(warning, str):
structured_warnings.append({
"row": None,
"field": None,
"message": warning,
"type": "general_warning"
})
else:
structured_warnings.append(warning)
# Calculate derived values
total_rows = result.get("total_rows", 0)
records_created = result.get("records_created", 0)
records_failed = total_rows - records_created - result.get("skipped", 0)
# Return complete response structure matching SalesImportResult schema
complete_response = {
"success": result.get("success", False),
"records_processed": total_rows, # ADDED: total rows processed
"records_created": records_created,
"records_updated": 0, # ADDED: default to 0 (we don't update, only create)
"records_failed": records_failed, # ADDED: calculated failed records
"errors": structured_errors, # FIXED: structured error objects
"warnings": structured_warnings, # FIXED: structured warning objects
"processing_time_seconds": processing_time, # ADDED: processing time
# Keep existing fields for backward compatibility
"total_rows": total_rows,
"skipped": result.get("skipped", 0),
"success_rate": result.get("success_rate", 0.0),
"source": file_format,
"filename": filename,
"error_count": len(structured_errors)
}
logger.info("Data processing completed",
records_created=records_created,
success_rate=complete_response["success_rate"],
processing_time=processing_time)
return complete_response
except Exception as e:
end_time = datetime.utcnow()
processing_time = (end_time - start_time).total_seconds()
error_message = f"Import failed: {str(e)}"
logger.error("Data import failed", error=error_message, tenant_id=tenant_id)
# Return error response with complete structure
return {
"success": False,
"records_processed": 0,
"records_created": 0,
"records_updated": 0,
"records_failed": 0,
"errors": [{
"row": None,
"field": None,
"message": error_message,
"type": "import_error"
}],
"warnings": [],
"processing_time_seconds": processing_time,
# Backward compatibility fields
"total_rows": 0,
"skipped": 0,
"success_rate": 0.0,
"source": file_format,
"filename": filename,
"error_count": 1
}
# Also need to update the _process_csv_data method to return proper structure
@staticmethod
async def _process_csv_data(tenant_id: str, csv_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
"""Process CSV data with improved error handling and structure"""
try:
# Parse CSV
reader = csv.DictReader(io.StringIO(csv_content))
rows = list(reader)
if not rows:
return {
"success": False,
"total_rows": 0,
"records_created": 0,
"skipped": 0,
"success_rate": 0.0,
"errors": ["CSV file is empty"],
"warnings": []
}
# Column mapping
column_mapping = DataImportService._get_column_mapping(list(rows[0].keys()))
records_created = 0
errors = []
warnings = []
skipped = 0
logger.info(f"Processing {len(rows)} records from CSV")
for index, row in enumerate(rows):
try:
# Extract and validate date
date_str = str(row.get(column_mapping.get('date', ''), '')).strip()
if not date_str or date_str.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Fila {index + 1}: Fecha faltante")
skipped += 1
continue
parsed_date = DataImportService._parse_date(date_str)
if not parsed_date:
errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}")
skipped += 1
continue
# Extract and validate product name
product_name = str(row.get(column_mapping.get('product', ''), '')).strip()
if not product_name or product_name.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Fila {index + 1}: Nombre de producto faltante")
skipped += 1
continue
# Clean product name
product_name = DataImportService._clean_product_name(product_name)
# Extract and validate quantity
quantity_raw = row.get(column_mapping.get('quantity', 'cantidad'), 1)
try:
quantity = int(float(str(quantity_raw).replace(',', '.')))
if quantity <= 0:
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1")
quantity = 1
except (ValueError, TypeError):
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1")
quantity = 1
# Extract revenue (optional)
revenue_raw = row.get(column_mapping.get('revenue', 'ingresos'), None)
revenue = None
if revenue_raw:
try:
revenue = float(str(revenue_raw).replace(',', '.'))
except (ValueError, TypeError):
revenue = quantity * 1.5 # Default calculation
else:
revenue = quantity * 1.5 # Default calculation
# Extract location (optional)
location_id = row.get(column_mapping.get('location', 'ubicacion'), None)
# Create sales record
sales_data = SalesDataCreate(
tenant_id=tenant_id,
date=parsed_date, # Use parsed_date instead of date
product_name=product_name,
quantity_sold=quantity,
revenue=revenue,
location_id=location_id,
source="csv"
)
await SalesService.create_sales_record(sales_data, db)
records_created += 1
except Exception as e:
error_msg = f"Fila {index + 1}: {str(e)}"
errors.append(error_msg)
skipped += 1
logger.warning("Record processing failed", error=error_msg)
success_rate = (records_created / len(rows)) * 100 if rows else 0
return {
"success": records_created > 0,
"total_rows": len(rows),
"records_created": records_created,
"skipped": skipped,
"success_rate": success_rate,
"errors": errors,
"warnings": warnings
}
except Exception as e:
logger.error("CSV processing failed", error=str(e))
return {
"success": False,
"total_rows": 0,
"records_created": 0,
"skipped": 0,
"success_rate": 0.0,
"errors": [f"CSV processing error: {str(e)}"],
"warnings": []
}
@staticmethod
async def _process_excel(tenant_id: str, excel_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
"""Process Excel file"""
try:
# Decode base64 content
if excel_content.startswith('data:'):
excel_bytes = base64.b64decode(excel_content.split(',')[1])
else:
excel_bytes = base64.b64decode(excel_content)
# Read Excel file - try first sheet
try:
df = pd.read_excel(io.BytesIO(excel_bytes), sheet_name=0)
except Exception as e:
# If pandas fails, try openpyxl directly
workbook = openpyxl.load_workbook(io.BytesIO(excel_bytes))
sheet = workbook.active
# Convert to DataFrame
data = []
headers = None
for row in sheet.iter_rows(values_only=True):
if headers is None:
headers = [str(cell).strip().lower() if cell else f"col_{i}" for i, cell in enumerate(row)]
else:
data.append(row)
df = pd.DataFrame(data, columns=headers)
# Clean column names
df.columns = df.columns.str.strip().str.lower()
# Remove empty rows
df = df.dropna(how='all')
# Map columns
column_mapping = DataImportService._detect_columns(df.columns.tolist())
if not column_mapping.get('date') or not column_mapping.get('product'):
return {
"success": False,
"error": f"Columnas requeridas no encontradas en Excel. Detectadas: {list(df.columns)}"
}
return await DataImportService._process_dataframe(
tenant_id, df, column_mapping, db, "excel", filename
)
except Exception as e:
logger.error("Excel processing failed", error=str(e))
return {"success": False, "error": f"Error procesando Excel: {str(e)}"}
@staticmethod
async def _process_json(tenant_id: str, json_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
"""Process JSON file"""
try:
# Parse JSON
if json_content.startswith('data:'):
json_content = base64.b64decode(json_content.split(',')[1]).decode('utf-8')
data = json.loads(json_content)
# Handle different JSON structures
if isinstance(data, dict):
if 'data' in data:
records = data['data']
elif 'records' in data:
records = data['records']
elif 'sales' in data:
records = data['sales']
else:
records = [data] # Single record
elif isinstance(data, list):
records = data
else:
return {"success": False, "error": "Formato JSON no válido"}
# Convert to DataFrame for consistent processing
df = pd.DataFrame(records)
df.columns = df.columns.str.strip().str.lower()
# Map columns
column_mapping = DataImportService._detect_columns(df.columns.tolist())
if not column_mapping.get('date') or not column_mapping.get('product'):
return {
"success": False,
"error": f"Columnas requeridas no encontradas en JSON. Detectadas: {list(df.columns)}"
}
return await DataImportService._process_dataframe(
tenant_id, df, column_mapping, db, "json", filename
)
except json.JSONDecodeError as e:
return {"success": False, "error": f"JSON inválido: {str(e)}"}
except Exception as e:
logger.error("JSON processing failed", error=str(e))
return {"success": False, "error": f"Error procesando JSON: {str(e)}"}
@staticmethod
async def _process_pos_data(tenant_id: str, pos_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]:
"""Process POS (Point of Sale) system data"""
try:
# POS data often comes in specific formats
# This is a generic parser that can be customized for specific POS systems
if pos_content.startswith('data:'):
pos_content = base64.b64decode(pos_content.split(',')[1]).decode('utf-8')
lines = pos_content.strip().split('\n')
records = []
for line_num, line in enumerate(lines, 1):
try:
# Skip empty lines and headers
if not line.strip() or line.startswith('#') or 'TOTAL' in line.upper():
continue
# Try different delimiters
for delimiter in ['\t', ';', '|', ',']:
if delimiter in line:
parts = line.split(delimiter)
if len(parts) >= 3: # At least date, product, quantity
records.append({
'date': parts[0].strip(),
'product': parts[1].strip(),
'quantity': parts[2].strip(),
'revenue': parts[3].strip() if len(parts) > 3 else None,
'line_number': line_num
})
break
except Exception as e:
logger.warning(f"Skipping POS line {line_num}: {e}")
continue
if not records:
return {"success": False, "error": "No se encontraron datos válidos en el archivo POS"}
# Convert to DataFrame
df = pd.DataFrame(records)
# Standard column mapping for POS
column_mapping = {
'date': 'date',
'product': 'product',
'quantity': 'quantity',
'revenue': 'revenue'
}
return await DataImportService._process_dataframe(
tenant_id, df, column_mapping, db, "pos", filename
)
except Exception as e:
logger.error("POS processing failed", error=str(e))
return {"success": False, "error": f"Error procesando datos POS: {str(e)}"}
@staticmethod
async def _process_dataframe(tenant_id: str,
df: pd.DataFrame,
column_mapping: Dict[str, str],
db: AsyncSession,
source: str,
filename: Optional[str] = None) -> Dict[str, Any]:
"""Process DataFrame with mapped columns"""
try:
records_created = 0
errors = []
warnings = []
skipped = 0
logger.info(f"Processing {len(df)} records from {source}")
for index, row in df.iterrows():
try:
# Extract and validate date
date_str = str(row.get(column_mapping['date'], '')).strip()
if not date_str or date_str.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Fila {index + 1}: Fecha faltante")
skipped += 1
continue
date = DataImportService._parse_date(date_str)
if not date:
errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}")
skipped += 1
continue
# Extract and validate product name
product_name = str(row.get(column_mapping['product'], '')).strip()
if not product_name or product_name.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Fila {index + 1}: Nombre de producto faltante")
skipped += 1
continue
# Clean product name
product_name = DataImportService._clean_product_name(product_name)
# Extract and validate quantity
quantity_raw = row.get(column_mapping.get('quantity', 'quantity'), 0)
try:
quantity = int(float(str(quantity_raw).replace(',', '.')))
if quantity <= 0:
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1")
quantity = 1
except (ValueError, TypeError):
warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1")
quantity = 1
# Extract revenue (optional)
revenue = None
if 'revenue' in column_mapping and column_mapping['revenue'] in row:
revenue_raw = row.get(column_mapping['revenue'])
if revenue_raw and str(revenue_raw).lower() not in ['nan', 'null', 'none', '']:
try:
revenue = float(str(revenue_raw).replace(',', '.').replace('', '').replace('$', '').strip())
if revenue < 0:
revenue = None
warnings.append(f"Fila {index + 1}: Ingreso negativo ignorado")
except (ValueError, TypeError):
warnings.append(f"Fila {index + 1}: Formato de ingreso inválido: {revenue_raw}")
# Extract location (optional)
location_id = None
if 'location' in column_mapping and column_mapping['location'] in row:
location_raw = row.get(column_mapping['location'])
if location_raw and str(location_raw).lower() not in ['nan', 'null', 'none', '']:
location_id = str(location_raw).strip()
# Create sales record
sales_data = SalesDataCreate(
tenant_id=tenant_id,
date=date,
product_name=product_name,
quantity_sold=quantity,
revenue=revenue,
location_id=location_id,
source=source,
raw_data=json.dumps({
**row.to_dict(),
"original_row": index + 1,
"filename": filename
})
)
await SalesService.create_sales_record(sales_data, db)
records_created += 1
# Log progress for large imports
if records_created % 100 == 0:
logger.info(f"Processed {records_created} records...")
except Exception as e:
error_msg = f"Fila {index + 1}: {str(e)}"
errors.append(error_msg)
logger.warning("Record processing failed", error=error_msg)
continue
# Calculate success rate
total_processed = records_created + skipped
success_rate = (records_created / len(df)) * 100 if len(df) > 0 else 0
result = {
"success": True,
"records_created": records_created,
"total_rows": len(df),
"skipped": skipped,
"success_rate": round(success_rate, 1),
"errors": errors[:10], # Limit to first 10 errors
"warnings": warnings[:10], # Limit to first 10 warnings
"source": source,
"filename": filename
}
if errors:
result["error_count"] = len(errors)
if len(errors) > 10:
result["errors"].append(f"... y {len(errors) - 10} errores más")
if warnings:
result["warning_count"] = len(warnings)
if len(warnings) > 10:
result["warnings"].append(f"... y {len(warnings) - 10} advertencias más")
logger.info("Data processing completed",
records_created=records_created,
total_rows=len(df),
success_rate=success_rate)
return result
except Exception as e:
logger.error("DataFrame processing failed", error=str(e))
return {
"success": False,
"error": f"Error procesando datos: {str(e)}",
"records_created": 0
}
@staticmethod
def _detect_columns(columns: List[str]) -> Dict[str, str]:
"""Detect column mappings using fuzzy matching"""
mapping = {}
columns_lower = [col.lower() for col in columns]
for standard_name, possible_names in DataImportService.COLUMN_MAPPINGS.items():
for col in columns_lower:
for possible in possible_names:
if possible in col or col in possible:
mapping[standard_name] = columns[columns_lower.index(col)]
break
if standard_name in mapping:
break
# Map common aliases
if 'product' not in mapping and 'product_name' in mapping:
mapping['product'] = mapping['product_name']
if 'quantity' not in mapping and 'quantity_sold' in mapping:
mapping['quantity'] = mapping['quantity_sold']
if 'location' not in mapping and 'location_id' in mapping:
mapping['location'] = mapping['location_id']
return mapping
@staticmethod
def _parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string with multiple format attempts"""
if not date_str or str(date_str).lower() in ['nan', 'null', 'none']:
return None
# Clean date string
date_str = str(date_str).strip()
# Try pandas first (handles most formats automatically)
try:
return pd.to_datetime(date_str, dayfirst=True)
except:
pass
# Try specific formats
for fmt in DataImportService.DATE_FORMATS:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# Try extracting numbers and common patterns
try:
# Look for patterns like dd/mm/yyyy or dd-mm-yyyy
date_pattern = re.search(r'(\d{1,2})[/\-.](\d{1,2})[/\-.](\d{2,4})', date_str)
if date_pattern:
day, month, year = date_pattern.groups()
# Convert 2-digit year to 4-digit
year = int(year)
if year < 50:
year += 2000
elif year < 100:
year += 1900
return datetime(year, int(month), int(day))
except:
pass
logger.warning(f"Could not parse date: {date_str}")
return None
@staticmethod
def _clean_product_name(product_name: str) -> str:
"""Clean and standardize product names"""
if not product_name:
return "Producto sin nombre"
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', str(product_name).strip())
# Remove special characters but keep Spanish characters
cleaned = re.sub(r'[^\w\s\-áéíóúñçüÁÉÍÓÚÑÇÜ]', '', cleaned)
# Capitalize first letter of each word
cleaned = cleaned.title()
# Common product name corrections for Spanish bakeries
replacements = {
'Pan De': 'Pan de',
'Café Con': 'Café con',
'Te ': '',
'Bocadillo De': 'Bocadillo de',
}
for old, new in replacements.items():
cleaned = cleaned.replace(old, new)
return cleaned if cleaned else "Producto sin nombre"
@staticmethod
async def validate_import_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate import data before processing"""
validation_result = {
"valid": True,
"errors": [],
"warnings": [],
"suggestions": []
}
# Check required fields
if not data.get("tenant_id"):
validation_result["errors"].append("tenant_id es requerido")
validation_result["valid"] = False
if not data.get("data"):
validation_result["errors"].append("Datos faltantes")
validation_result["valid"] = False
# Check file format
format_type = data.get("data_format", "").lower()
if format_type not in ["csv", "excel", "xlsx", "xls", "json", "pos"]:
validation_result["errors"].append(f"Formato no soportado: {format_type}")
validation_result["valid"] = False
# Check data size (prevent very large uploads)
data_content = data.get("data", "")
if len(data_content) > 10 * 1024 * 1024: # 10MB limit
validation_result["errors"].append("Archivo demasiado grande (máximo 10MB)")
validation_result["valid"] = False
# Suggestions for better imports
if len(data_content) > 1024 * 1024: # 1MB
validation_result["suggestions"].append("Archivo grande detectado. Considere dividir en archivos más pequeños para mejor rendimiento.")
return validation_result
@staticmethod
async def get_import_template(format_type: str = "csv") -> Dict[str, Any]:
"""Generate import template for specified format"""
try:
# Sample data for template
sample_data = [
{
"fecha": "15/01/2024",
"producto": "Pan Integral",
"cantidad": 25,
"ingresos": 37.50,
"ubicacion": "madrid_centro"
},
{
"fecha": "15/01/2024",
"producto": "Croissant",
"cantidad": 15,
"ingresos": 22.50,
"ubicacion": "madrid_centro"
},
{
"fecha": "15/01/2024",
"producto": "Café con Leche",
"cantidad": 42,
"ingresos": 84.00,
"ubicacion": "madrid_centro"
}
]
if format_type.lower() == "csv":
# Generate CSV template
output = io.StringIO()
df = pd.DataFrame(sample_data)
df.to_csv(output, index=False)
return {
"template": output.getvalue(),
"content_type": "text/csv",
"filename": "plantilla_ventas.csv"
}
elif format_type.lower() == "json":
return {
"template": json.dumps(sample_data, indent=2, ensure_ascii=False),
"content_type": "application/json",
"filename": "plantilla_ventas.json"
}
elif format_type.lower() in ["excel", "xlsx"]:
# Generate Excel template
output = io.BytesIO()
df = pd.DataFrame(sample_data)
df.to_excel(output, index=False, sheet_name="Ventas")
return {
"template": base64.b64encode(output.getvalue()).decode(),
"content_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"filename": "plantilla_ventas.xlsx"
}
else:
return {
"error": f"Formato de plantilla no soportado: {format_type}"
}
except Exception as e:
logger.error("Template generation failed", error=str(e))
return {
"error": f"Error generando plantilla: {str(e)}"
}
@staticmethod
def _get_column_mapping(columns: List[str]) -> Dict[str, str]:
"""Get column mapping - alias for _detect_columns"""
return DataImportService._detect_columns(columns)
@staticmethod
def _clean_product_name(product_name: str) -> str:
"""Clean and normalize product name"""
if not product_name:
return ""
# Basic cleaning
cleaned = str(product_name).strip().lower()
# Remove extra whitespace
import re
cleaned = re.sub(r'\s+', ' ', cleaned)
return cleaned