Files
bakery-ia/services/sales/app/services/data_import_service.py
Urtzi Alfaro cafd316c4b Fix issues 3
2025-08-17 13:35:05 +02:00

1221 lines
57 KiB
Python

# services/sales/app/services/data_import_service.py
"""
Data Import Service
Service for importing sales data using repository pattern and enhanced error handling
"""
import csv
import io
import json
import base64
import pandas as pd
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timezone
from uuid import UUID
import structlog
import re
import asyncio
from app.repositories.sales_repository import SalesRepository
from app.models.sales import SalesData
from app.schemas.sales import SalesDataCreate
from app.core.database import get_db_transaction
from app.services.inventory_client import InventoryServiceClient
logger = structlog.get_logger()
# Import result schemas (dataclass definitions)
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class SalesValidationResult:
is_valid: bool
total_records: int
valid_records: int
invalid_records: int
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
summary: Dict[str, Any]
@dataclass
class SalesImportResult:
success: bool
records_processed: int
records_created: int
records_updated: int
records_failed: int
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
processing_time_seconds: float
class DataImportService:
"""Enhanced data import service using repository pattern with STRICT validation for production"""
# PRODUCTION VALIDATION CONFIGURATION
STRICT_VALIDATION = True # Set to False for lenient validation, True for production quality
MAX_QUANTITY_PER_DAY = 10000 # Maximum reasonable quantity per product per day
MAX_REVENUE_PER_ITEM = 100000 # Maximum reasonable revenue per line item
MAX_UNIT_PRICE = 10000 # Maximum reasonable price per unit for bakery items
# Common column mappings for different languages/formats
COLUMN_MAPPINGS = {
'date': ['date', 'fecha', 'datum', 'data', 'dia'],
'datetime': ['datetime', 'fecha_hora', 'timestamp'],
'product': ['product', 'producto', 'item', 'articulo', 'nombre', 'name'],
'product_name': ['product_name', 'nombre_producto', 'item_name'],
'quantity': ['quantity', 'cantidad', 'qty', 'units', 'unidades'],
'quantity_sold': ['quantity_sold', 'cantidad_vendida', 'sold'],
'revenue': ['revenue', 'ingresos', 'sales', 'ventas', 'total', 'importe'],
'price': ['price', 'precio', 'cost', 'coste'],
'location': ['location', 'ubicacion', 'tienda', 'store', 'punto_venta'],
'location_id': ['location_id', 'store_id', 'tienda_id'],
}
DATE_FORMATS = [
'%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%d-%m-%Y', '%m-%d-%Y',
'%d.%m.%Y', '%Y/%m/%d', '%d/%m/%y', '%m/%d/%y',
'%Y-%m-%d %H:%M:%S', '%d/%m/%Y %H:%M',
]
def __init__(self):
"""Initialize enhanced import service"""
self.inventory_client = InventoryServiceClient()
# Product resolution cache for the import session
self.product_cache = {} # product_name -> inventory_product_id
self.failed_products = set() # Track products that failed to resolve
async def validate_import_data(self, data: Dict[str, Any]) -> SalesValidationResult:
"""Enhanced validation with better error handling and suggestions"""
try:
logger.info("Starting enhanced import data validation", tenant_id=data.get("tenant_id"))
validation_result = SalesValidationResult(
is_valid=True,
total_records=0,
valid_records=0,
invalid_records=0,
errors=[],
warnings=[],
summary={}
)
errors = []
warnings = []
# Basic validation checks
if not data.get("tenant_id"):
errors.append({
"type": "missing_field",
"message": "tenant_id es requerido",
"field": "tenant_id",
"row": None,
"code": "MISSING_TENANT_ID"
})
if not data.get("data"):
errors.append({
"type": "missing_data",
"message": "Datos de archivo faltantes",
"field": "data",
"row": None,
"code": "NO_DATA_PROVIDED"
})
validation_result.is_valid = False
validation_result.errors = errors
validation_result.summary = {
"status": "failed",
"reason": "no_data_provided",
"file_format": data.get("data_format", "unknown"),
"suggestions": ["Selecciona un archivo válido para importar"]
}
return validation_result
# Validate file format
format_type = data.get("data_format", "").lower()
supported_formats = ["csv", "excel", "xlsx", "xls", "json", "pos"]
if format_type not in supported_formats:
errors.append({
"type": "unsupported_format",
"message": f"Formato no soportado: {format_type}",
"field": "data_format",
"row": None,
"code": "UNSUPPORTED_FORMAT"
})
# Validate data size
data_content = data.get("data", "")
data_size = len(data_content)
if data_size == 0:
errors.append({
"type": "empty_file",
"message": "El archivo está vacío",
"field": "data",
"row": None,
"code": "EMPTY_FILE"
})
elif data_size > 10 * 1024 * 1024: # 10MB limit
errors.append({
"type": "file_too_large",
"message": "Archivo demasiado grande (máximo 10MB)",
"field": "data",
"row": None,
"code": "FILE_TOO_LARGE"
})
elif data_size > 1024 * 1024: # 1MB warning
warnings.append({
"type": "large_file",
"message": "Archivo grande detectado. El procesamiento puede tomar más tiempo.",
"field": "data",
"row": None,
"code": "LARGE_FILE_WARNING"
})
# Analyze CSV content if format is CSV
if format_type == "csv" and data_content and not errors:
try:
reader = csv.DictReader(io.StringIO(data_content))
rows = list(reader)
validation_result.total_records = len(rows)
if not rows:
errors.append({
"type": "empty_content",
"message": "El archivo CSV no contiene datos",
"field": "data",
"row": None,
"code": "NO_CONTENT"
})
else:
# Enhanced column analysis
headers = list(rows[0].keys()) if rows else []
column_mapping = self._detect_columns(headers)
# Check for required columns
if not column_mapping.get('date'):
errors.append({
"type": "missing_column",
"message": "Columna de fecha no encontrada",
"field": "date",
"row": None,
"code": "MISSING_DATE_COLUMN"
})
if not column_mapping.get('product'):
errors.append({
"type": "missing_column",
"message": "Columna de producto no encontrada",
"field": "product",
"row": None,
"code": "MISSING_PRODUCT_COLUMN"
})
if not column_mapping.get('quantity'):
warnings.append({
"type": "missing_column",
"message": "Columna de cantidad no encontrada, se usará 1 por defecto",
"field": "quantity",
"row": None,
"code": "MISSING_QUANTITY_COLUMN"
})
# Enhanced data quality estimation
if not errors:
sample_size = min(10, len(rows))
sample_rows = rows[:sample_size]
quality_issues = 0
for i, row in enumerate(sample_rows):
parsed_data = await self._parse_row_data(row, column_mapping, i + 1)
if parsed_data.get("skip") or parsed_data.get("errors"):
quality_issues += 1
estimated_error_rate = (quality_issues / sample_size) * 100 if sample_size > 0 else 0
estimated_invalid = int(validation_result.total_records * estimated_error_rate / 100)
validation_result.valid_records = validation_result.total_records - estimated_invalid
validation_result.invalid_records = estimated_invalid
# STRICT: Any data quality issues should fail validation for production
if estimated_error_rate > 0:
errors.append({
"type": "data_quality_error",
"message": f"Falló la validación de calidad: {estimated_error_rate:.0f}% de los datos tienen errores críticos",
"field": "data",
"row": None,
"code": "DATA_QUALITY_FAILED"
})
# Add specific error details
if estimated_error_rate > 50:
errors.append({
"type": "data_quality_critical",
"message": f"Calidad de datos crítica: más del 50% de los registros tienen errores",
"field": "data",
"row": None,
"code": "DATA_QUALITY_CRITICAL"
})
elif estimated_error_rate > 20:
errors.append({
"type": "data_quality_high",
"message": f"Alta tasa de errores detectada: {estimated_error_rate:.0f}% de los datos requieren corrección",
"field": "data",
"row": None,
"code": "DATA_QUALITY_HIGH_ERROR_RATE"
})
else:
# Even small error rates are now treated as validation failures
errors.append({
"type": "data_quality_detected",
"message": f"Se detectaron errores de validación en {estimated_error_rate:.0f}% de los datos",
"field": "data",
"row": None,
"code": "DATA_QUALITY_ERRORS_FOUND"
})
else:
validation_result.valid_records = 0
validation_result.invalid_records = validation_result.total_records
except Exception as csv_error:
logger.warning("Enhanced CSV analysis failed", error=str(csv_error))
warnings.append({
"type": "analysis_warning",
"message": f"No se pudo analizar completamente el CSV: {str(csv_error)}",
"field": "data",
"row": None,
"code": "CSV_ANALYSIS_WARNING"
})
# Set validation result
validation_result.is_valid = len(errors) == 0
validation_result.errors = errors
validation_result.warnings = warnings
# Enhanced summary generation
validation_result.summary = {
"status": "valid" if validation_result.is_valid else "invalid",
"file_format": format_type,
"file_size_bytes": data_size,
"file_size_mb": round(data_size / (1024 * 1024), 2),
"estimated_processing_time_seconds": max(1, validation_result.total_records // 100),
"validation_timestamp": datetime.utcnow().isoformat(),
"detected_columns": list(self._detect_columns(list(csv.DictReader(io.StringIO(data_content)).fieldnames or [])).keys()) if format_type == "csv" and data_content else [],
"suggestions": self._generate_suggestions(validation_result, format_type, len(warnings))
}
logger.info("Enhanced import validation completed",
is_valid=validation_result.is_valid,
total_records=validation_result.total_records,
error_count=len(errors),
warning_count=len(warnings))
return validation_result
except Exception as e:
logger.error("Enhanced validation process failed", error=str(e))
return SalesValidationResult(
is_valid=False,
total_records=0,
valid_records=0,
invalid_records=0,
errors=[{
"type": "system_error",
"message": f"Error en el proceso de validación: {str(e)}",
"field": None,
"row": None,
"code": "SYSTEM_ERROR"
}],
warnings=[],
summary={
"status": "error",
"file_format": data.get("data_format", "unknown"),
"error_type": "system_error",
"suggestions": [
"Intenta de nuevo con un archivo diferente",
"Contacta soporte si el problema persiste"
]
}
)
async def process_import(
self,
tenant_id: str,
content: str,
file_format: str,
filename: Optional[str] = None
) -> SalesImportResult:
"""Enhanced data import processing with better error handling"""
start_time = datetime.utcnow()
try:
# Clear cache for new import session
self._clear_import_cache()
logger.info("Starting enhanced data import",
filename=filename,
format=file_format,
tenant_id=tenant_id)
async with get_db_transaction() as db:
repository = SalesRepository(db)
# Process data based on format
if file_format.lower() == 'csv':
result = await self._process_csv_data(tenant_id, content, repository, filename)
elif file_format.lower() == 'json':
result = await self._process_json_data(tenant_id, content, repository, filename)
elif file_format.lower() in ['excel', 'xlsx']:
result = await self._process_excel_data(tenant_id, content, repository, filename)
else:
raise ValueError(f"Unsupported format: {file_format}")
# Calculate processing time
end_time = datetime.utcnow()
processing_time = (end_time - start_time).total_seconds()
# Build enhanced final result
final_result = SalesImportResult(
success=result.get("success", False),
records_processed=result.get("total_rows", 0),
records_created=result.get("records_created", 0),
records_updated=0, # We don't update, only create
records_failed=result.get("total_rows", 0) - result.get("records_created", 0),
errors=self._structure_messages(result.get("errors", [])),
warnings=self._structure_messages(result.get("warnings", [])),
processing_time_seconds=processing_time
)
logger.info("Enhanced data import completed successfully",
records_created=final_result.records_created,
processing_time=processing_time)
return final_result
except Exception as e:
end_time = datetime.utcnow()
processing_time = (end_time - start_time).total_seconds()
logger.error("Enhanced data import failed", error=str(e), tenant_id=tenant_id)
return SalesImportResult(
success=False,
records_processed=0,
records_created=0,
records_updated=0,
records_failed=0,
errors=[{
"type": "import_error",
"message": f"Import failed: {str(e)}",
"field": None,
"row": None,
"code": "IMPORT_FAILURE"
}],
warnings=[],
processing_time_seconds=processing_time
)
async def _process_csv_data(
self,
tenant_id: str,
csv_content: str,
repository: SalesRepository,
filename: Optional[str] = None
) -> Dict[str, Any]:
"""Enhanced CSV processing with batch product resolution for better reliability"""
try:
reader = csv.DictReader(io.StringIO(csv_content))
rows = list(reader)
if not rows:
return {
"success": False,
"total_rows": 0,
"records_created": 0,
"errors": ["CSV file is empty"],
"warnings": []
}
# Enhanced column mapping
column_mapping = self._detect_columns(list(rows[0].keys()))
# Pre-process to extract unique products for batch creation
unique_products = set()
parsed_rows = []
logger.info(f"Pre-processing {len(rows)} records to identify unique products")
for index, row in enumerate(rows):
try:
# Enhanced data parsing and validation
parsed_data = await self._parse_row_data(row, column_mapping, index + 1)
if not parsed_data.get("skip"):
unique_products.add((
parsed_data["product_name"],
parsed_data.get("product_category", "general")
))
parsed_rows.append((index, parsed_data))
except Exception as e:
logger.warning(f"Failed to parse row {index + 1}: {e}")
continue
logger.info(f"Found {len(unique_products)} unique products, attempting batch resolution")
# Try to resolve/create all unique products in batch
await self._batch_resolve_products(unique_products, tenant_id)
# Now process the actual sales records
records_created = 0
errors = []
warnings = []
logger.info(f"Processing {len(parsed_rows)} validated records for sales creation")
for index, parsed_data in parsed_rows:
try:
# Resolve product name to inventory_product_id (should be cached now)
inventory_product_id = await self._resolve_product_to_inventory_id(
parsed_data["product_name"],
parsed_data.get("product_category"),
tenant_id
)
if not inventory_product_id:
error_msg = f"Row {index + 1}: Could not resolve product '{parsed_data['product_name']}' to inventory ID"
errors.append(error_msg)
logger.warning("Product resolution failed", error=error_msg)
continue
# Create sales record with enhanced data
sales_data = SalesDataCreate(
tenant_id=tenant_id,
date=parsed_data["date"],
inventory_product_id=inventory_product_id,
quantity_sold=parsed_data["quantity_sold"],
unit_price=parsed_data.get("unit_price"),
revenue=parsed_data.get("revenue"),
location_id=parsed_data.get("location_id"),
source="csv"
)
created_record = await repository.create_sales_record(sales_data, tenant_id)
records_created += 1
# Enhanced progress logging
if records_created % 100 == 0:
logger.info(f"Enhanced processing: {records_created}/{len(rows)} records completed...")
except Exception as e:
error_msg = f"Row {index + 1}: {str(e)}"
errors.append(error_msg)
logger.warning("Enhanced record processing failed", error=error_msg)
success_rate = (records_created / len(rows)) * 100 if rows else 0
return {
"success": records_created > 0,
"total_rows": len(rows),
"records_created": records_created,
"success_rate": success_rate,
"errors": errors,
"warnings": warnings
}
except Exception as e:
logger.error("Enhanced CSV processing failed", error=str(e))
raise
async def _process_json_data(
self,
tenant_id: str,
json_content: str,
repository: SalesRepository,
filename: Optional[str] = None
) -> Dict[str, Any]:
"""Enhanced JSON processing with pandas integration"""
try:
# Parse JSON with base64 support
if json_content.startswith('data:'):
json_content = base64.b64decode(json_content.split(',')[1]).decode('utf-8')
data = json.loads(json_content)
# Handle different JSON structures
if isinstance(data, dict):
if 'data' in data:
records = data['data']
elif 'records' in data:
records = data['records']
elif 'sales' in data:
records = data['sales']
else:
records = [data] # Single record
elif isinstance(data, list):
records = data
else:
raise ValueError("Invalid JSON format")
# Convert to DataFrame for enhanced processing
if records:
df = pd.DataFrame(records)
df.columns = df.columns.str.strip().str.lower()
return await self._process_dataframe(tenant_id, df, repository, "json", filename)
else:
return {
"success": False,
"total_rows": 0,
"records_created": 0,
"errors": ["No records found in JSON"],
"warnings": []
}
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {str(e)}")
except Exception as e:
logger.error("Enhanced JSON processing failed", error=str(e))
raise
async def _process_excel_data(
self,
tenant_id: str,
excel_content: str,
repository: SalesRepository,
filename: Optional[str] = None
) -> Dict[str, Any]:
"""Enhanced Excel processing with base64 support"""
try:
# Decode base64 content
if excel_content.startswith('data:'):
excel_bytes = base64.b64decode(excel_content.split(',')[1])
else:
excel_bytes = base64.b64decode(excel_content)
# Read Excel file with pandas
df = pd.read_excel(io.BytesIO(excel_bytes), sheet_name=0)
# Enhanced column cleaning
df.columns = df.columns.str.strip().str.lower()
# Remove empty rows
df = df.dropna(how='all')
return await self._process_dataframe(tenant_id, df, repository, "excel", filename)
except Exception as e:
logger.error("Enhanced Excel processing failed", error=str(e))
raise
async def _process_dataframe(
self,
tenant_id: str,
df: pd.DataFrame,
repository: SalesRepository,
source: str,
filename: Optional[str] = None
) -> Dict[str, Any]:
"""Enhanced DataFrame processing with better error handling"""
try:
# Enhanced column mapping
column_mapping = self._detect_columns(df.columns.tolist())
if not column_mapping.get('date') or not column_mapping.get('product'):
required_missing = []
if not column_mapping.get('date'):
required_missing.append("date")
if not column_mapping.get('product'):
required_missing.append("product")
raise ValueError(f"Required columns missing: {', '.join(required_missing)}")
records_created = 0
errors = []
warnings = []
logger.info(f"Enhanced processing of {len(df)} records from {source}")
for index, row in df.iterrows():
try:
# Convert pandas row to dict
row_dict = {}
for col in df.columns:
val = row[col]
# Handle pandas NaN values
if pd.isna(val):
row_dict[col] = None
else:
row_dict[col] = val
# Enhanced data parsing
parsed_data = await self._parse_row_data(row_dict, column_mapping, index + 1)
if parsed_data.get("skip"):
errors.extend(parsed_data.get("errors", []))
warnings.extend(parsed_data.get("warnings", []))
continue
# Resolve product name to inventory_product_id
inventory_product_id = await self._resolve_product_to_inventory_id(
parsed_data["product_name"],
parsed_data.get("product_category"),
tenant_id
)
if not inventory_product_id:
error_msg = f"Row {index + 1}: Could not resolve product '{parsed_data['product_name']}' to inventory ID"
errors.append(error_msg)
logger.warning("Product resolution failed", error=error_msg)
continue
# Create enhanced sales record
sales_data = SalesDataCreate(
tenant_id=tenant_id,
date=parsed_data["date"],
inventory_product_id=inventory_product_id,
quantity_sold=parsed_data["quantity_sold"],
unit_price=parsed_data.get("unit_price"),
revenue=parsed_data.get("revenue"),
location_id=parsed_data.get("location_id"),
source=source
)
created_record = await repository.create_sales_record(sales_data, tenant_id)
records_created += 1
# Progress logging for large datasets
if records_created % 100 == 0:
logger.info(f"Enhanced DataFrame processing: {records_created}/{len(df)} records completed...")
except Exception as e:
error_msg = f"Row {index + 1}: {str(e)}"
errors.append(error_msg)
logger.warning("Enhanced record processing failed", error=error_msg)
success_rate = (records_created / len(df)) * 100 if len(df) > 0 else 0
return {
"success": records_created > 0,
"total_rows": len(df),
"records_created": records_created,
"success_rate": success_rate,
"errors": errors[:10], # Limit errors for performance
"warnings": warnings[:10] # Limit warnings
}
except ValueError:
raise
except Exception as e:
logger.error("Enhanced DataFrame processing failed", error=str(e))
raise
async def _parse_row_data(
self,
row: Dict[str, Any],
column_mapping: Dict[str, str],
row_number: int
) -> Dict[str, Any]:
"""Enhanced row data parsing with better validation"""
errors = []
warnings = []
try:
# Enhanced date extraction and validation
date_str = str(row.get(column_mapping.get('date', ''), '')).strip()
if not date_str or date_str.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Row {row_number}: Missing date")
return {"skip": True, "errors": errors, "warnings": warnings}
parsed_date = self._parse_date(date_str)
if not parsed_date:
errors.append(f"Row {row_number}: Invalid date format: {date_str}")
return {"skip": True, "errors": errors, "warnings": warnings}
# Enhanced product name extraction and cleaning
product_name = str(row.get(column_mapping.get('product', ''), '')).strip()
if not product_name or product_name.lower() in ['nan', 'null', 'none', '']:
errors.append(f"Row {row_number}: Missing product name")
return {"skip": True, "errors": errors, "warnings": warnings}
product_name = self._clean_product_name(product_name)
# STRICT quantity validation for production data quality
quantity_raw = row.get(column_mapping.get('quantity', 'quantity'), 1)
try:
if pd.isna(quantity_raw):
# Allow default quantity of 1 for missing values
quantity = 1
else:
quantity = int(float(str(quantity_raw).replace(',', '.')))
if quantity <= 0:
# STRICT: Treat invalid quantities as ERRORS, not warnings
errors.append(f"Row {row_number}: Invalid quantity ({quantity}) - quantities must be positive")
return {"skip": True, "errors": errors, "warnings": warnings}
elif self.STRICT_VALIDATION and quantity > self.MAX_QUANTITY_PER_DAY:
# STRICT: Check for unrealistic quantities
errors.append(f"Row {row_number}: Unrealistic quantity ({quantity}) - exceeds maximum expected daily sales ({self.MAX_QUANTITY_PER_DAY})")
return {"skip": True, "errors": errors, "warnings": warnings}
except (ValueError, TypeError):
# STRICT: Treat non-numeric quantities as ERRORS
errors.append(f"Row {row_number}: Invalid quantity format ({quantity_raw}) - must be a positive number")
return {"skip": True, "errors": errors, "warnings": warnings}
# Enhanced revenue extraction
revenue = None
unit_price = None
if 'revenue' in column_mapping and column_mapping['revenue'] in row:
revenue_raw = row.get(column_mapping['revenue'])
if revenue_raw and not pd.isna(revenue_raw) and str(revenue_raw).lower() not in ['nan', 'null', 'none', '']:
try:
revenue = float(str(revenue_raw).replace(',', '.').replace('', '').replace('$', '').strip())
if revenue < 0:
# STRICT: Treat negative revenue as ERROR, not warning
errors.append(f"Row {row_number}: Negative revenue ({revenue}) - revenue must be positive or zero")
return {"skip": True, "errors": errors, "warnings": warnings}
else:
# STRICT: Check for unrealistic revenue values
if self.STRICT_VALIDATION and revenue > self.MAX_REVENUE_PER_ITEM:
errors.append(f"Row {row_number}: Unrealistic revenue ({revenue}) - exceeds maximum expected value ({self.MAX_REVENUE_PER_ITEM})")
return {"skip": True, "errors": errors, "warnings": warnings}
# Calculate unit price if we have both revenue and quantity
unit_price = revenue / quantity if quantity > 0 else None
# STRICT: Validate unit price reasonableness
if unit_price and unit_price > 10000: # More than €10,000 per unit seems unrealistic for bakery
errors.append(f"Row {row_number}: Unrealistic unit price ({unit_price:.2f}) - check quantity and revenue values")
return {"skip": True, "errors": errors, "warnings": warnings}
except (ValueError, TypeError):
# STRICT: Treat invalid revenue format as ERROR
errors.append(f"Row {row_number}: Invalid revenue format ({revenue_raw}) - must be a valid number")
return {"skip": True, "errors": errors, "warnings": warnings}
# Enhanced location extraction
location_id = None
if 'location' in column_mapping and column_mapping['location'] in row:
location_raw = row.get(column_mapping['location'])
if location_raw and not pd.isna(location_raw) and str(location_raw).lower() not in ['nan', 'null', 'none', '']:
location_id = str(location_raw).strip()
# Enhanced product category extraction
product_category = None
if 'category' in column_mapping and column_mapping['category'] in row:
category_raw = row.get(column_mapping['category'])
if category_raw and not pd.isna(category_raw) and str(category_raw).lower() not in ['nan', 'null', 'none', '']:
product_category = str(category_raw).strip()
return {
"skip": False,
"date": parsed_date,
"product_name": product_name,
"product_category": product_category,
"quantity_sold": quantity,
"unit_price": unit_price,
"revenue": revenue,
"location_id": location_id,
"errors": errors,
"warnings": warnings
}
except Exception as e:
errors.append(f"Row {row_number}: Enhanced parsing error: {str(e)}")
return {"skip": True, "errors": errors, "warnings": warnings}
def _detect_columns(self, columns: List[str]) -> Dict[str, str]:
"""Enhanced column detection with fuzzy matching"""
mapping = {}
columns_lower = [col.lower().strip() for col in columns]
for standard_name, possible_names in self.COLUMN_MAPPINGS.items():
best_match = None
best_score = 0
for col_idx, col in enumerate(columns_lower):
for possible in possible_names:
# Exact match (highest priority)
if possible == col:
best_match = columns[col_idx]
best_score = 100
break
# Contains match
elif possible in col or col in possible:
score = len(possible) / len(col) * 90
if score > best_score:
best_match = columns[col_idx]
best_score = score
if best_score == 100: # Found exact match
break
if best_match and best_score > 70: # Threshold for matches
mapping[standard_name] = best_match
# Enhanced alias mapping
if 'product' not in mapping and 'product_name' in mapping:
mapping['product'] = mapping['product_name']
if 'quantity' not in mapping and 'quantity_sold' in mapping:
mapping['quantity'] = mapping['quantity_sold']
if 'location' not in mapping and 'location_id' in mapping:
mapping['location'] = mapping['location_id']
return mapping
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Enhanced date parsing with explicit format handling for CSV dates"""
if not date_str or str(date_str).lower() in ['nan', 'null', 'none']:
return None
date_str = str(date_str).strip()
# For CSV format like "2024/10/01", try specific formats first to avoid ambiguity
# Priority order: YYYY/MM/DD (most likely for machine-generated data)
priority_formats = [
'%Y/%m/%d', # 2024/10/01 (October 1, 2024) - most likely for CSV exports
'%Y-%m-%d', # 2024-10-01
'%d/%m/%Y', # 01/10/2024 (European format)
'%m/%d/%Y', # 10/01/2024 (US format)
]
# Try priority formats first
for fmt in priority_formats:
try:
parsed_dt = datetime.strptime(date_str, fmt)
if parsed_dt.tzinfo is None:
parsed_dt = parsed_dt.replace(tzinfo=timezone.utc)
logger.debug(f"Successfully parsed date '{date_str}' using format '{fmt}' -> {parsed_dt}")
return parsed_dt
except ValueError:
continue
# Try pandas as fallback with explicit format inference
try:
# For YYYY/MM/DD format, disable dayfirst to prevent misinterpretation
if '/' in date_str and len(date_str.split('/')[0]) == 4:
# Looks like YYYY/MM/DD format, so don't use dayfirst
parsed_dt = pd.to_datetime(date_str, dayfirst=False)
else:
# For other formats, use dayfirst=True for European-style dates
parsed_dt = pd.to_datetime(date_str, dayfirst=True)
if hasattr(parsed_dt, 'to_pydatetime'):
parsed_dt = parsed_dt.to_pydatetime()
if parsed_dt.tzinfo is None:
parsed_dt = parsed_dt.replace(tzinfo=timezone.utc)
logger.debug(f"Successfully parsed date '{date_str}' using pandas -> {parsed_dt}")
return parsed_dt
except Exception as e:
logger.debug(f"Pandas date parsing failed for '{date_str}': {e}")
pass
# Try remaining formats as last fallback
for fmt in self.DATE_FORMATS:
if fmt not in priority_formats: # Skip already tried formats
try:
parsed_dt = datetime.strptime(date_str, fmt)
if parsed_dt.tzinfo is None:
parsed_dt = parsed_dt.replace(tzinfo=timezone.utc)
logger.debug(f"Successfully parsed date '{date_str}' using fallback format '{fmt}' -> {parsed_dt}")
return parsed_dt
except ValueError:
continue
logger.warning(f"Could not parse date: {date_str}")
return None
def _clean_product_name(self, product_name: str) -> str:
"""Enhanced product name cleaning and standardization"""
if not product_name:
return "Producto sin nombre"
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', str(product_name).strip())
# Remove special characters but keep Spanish characters
cleaned = re.sub(r'[^\w\s\-áéíóúñçüÁÉÍÓÚÑÇÜ]', '', cleaned)
# Capitalize first letter of each word
cleaned = cleaned.title()
# Enhanced corrections for Spanish bakeries
replacements = {
'Pan De': 'Pan de',
'Café Con': 'Café con',
'Te ': '',
'Bocadillo De': 'Bocadillo de',
'Dulce De': 'Dulce de',
'Tarta De': 'Tarta de',
}
for old, new in replacements.items():
cleaned = cleaned.replace(old, new)
return cleaned if cleaned else "Producto sin nombre"
def _clear_import_cache(self):
"""Clear the product resolution cache for a new import session"""
self.product_cache.clear()
self.failed_products.clear()
logger.info("Import cache cleared for new session")
async def _resolve_product_to_inventory_id(self, product_name: str, product_category: Optional[str], tenant_id: UUID) -> Optional[UUID]:
"""Resolve a product name to an inventory_product_id via the inventory service with improved error handling and fallback"""
# Check cache first
if product_name in self.product_cache:
logger.debug("Product resolved from cache", product_name=product_name, tenant_id=tenant_id)
return self.product_cache[product_name]
# Skip if this product already failed to resolve after all attempts
if product_name in self.failed_products:
logger.debug("Skipping previously failed product", product_name=product_name, tenant_id=tenant_id)
return None
max_retries = 5 # Increased retries
base_delay = 2.0 # Increased base delay
fallback_retry_delay = 10.0 # Longer delay for fallback attempts
for attempt in range(max_retries):
try:
# Add progressive delay to avoid rate limiting
if attempt > 0:
# Use longer delays for later attempts
if attempt >= 3:
delay = fallback_retry_delay # Use fallback delay for later attempts
else:
delay = base_delay * (2 ** (attempt - 1)) # Exponential backoff
logger.info(f"Retrying product resolution after {delay}s delay",
product_name=product_name, attempt=attempt, tenant_id=tenant_id)
await asyncio.sleep(delay)
# First try to search for existing product by name
try:
products = await self.inventory_client.search_products(product_name, tenant_id)
if products:
# Return the first matching product's ID
product_id = products[0].get('id')
if product_id:
uuid_id = UUID(str(product_id))
self.product_cache[product_name] = uuid_id # Cache for future use
logger.info("Resolved product to existing inventory ID",
product_name=product_name, product_id=product_id, tenant_id=tenant_id)
return uuid_id
except Exception as search_error:
logger.warning("Product search failed, trying direct creation",
product_name=product_name, error=str(search_error), tenant_id=tenant_id)
# Add delay before creation attempt to avoid hitting rate limits
await asyncio.sleep(1.0)
# If not found or search failed, create a new ingredient/product in inventory
ingredient_data = {
'name': product_name,
'type': 'finished_product', # Assuming sales are of finished products
'unit': 'unit', # Default unit
'current_stock': 0, # No stock initially
'reorder_point': 0,
'cost_per_unit': 0,
'category': product_category or 'general'
}
try:
created_product = await self.inventory_client.create_ingredient(ingredient_data, str(tenant_id))
if created_product and created_product.get('id'):
product_id = created_product['id']
uuid_id = UUID(str(product_id))
self.product_cache[product_name] = uuid_id # Cache for future use
logger.info("Created new inventory product for sales data",
product_name=product_name, product_id=product_id, tenant_id=tenant_id)
return uuid_id
except Exception as creation_error:
logger.warning("Product creation failed",
product_name=product_name, error=str(creation_error), tenant_id=tenant_id)
logger.warning("Failed to resolve or create product in inventory",
product_name=product_name, tenant_id=tenant_id, attempt=attempt)
except Exception as e:
error_str = str(e)
if "429" in error_str or "rate limit" in error_str.lower() or "too many requests" in error_str.lower():
logger.warning("Rate limit or service overload detected, retrying with longer delay",
product_name=product_name, attempt=attempt, error=error_str, tenant_id=tenant_id)
if attempt < max_retries - 1:
continue # Retry with exponential backoff
elif "503" in error_str or "502" in error_str or "service unavailable" in error_str.lower():
logger.warning("Service unavailable, retrying with backoff",
product_name=product_name, attempt=attempt, error=error_str, tenant_id=tenant_id)
if attempt < max_retries - 1:
continue # Retry for service unavailable errors
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
logger.warning("Network issue detected, retrying",
product_name=product_name, attempt=attempt, error=error_str, tenant_id=tenant_id)
if attempt < max_retries - 1:
continue # Retry for network issues
else:
logger.error("Non-retryable error resolving product to inventory ID",
error=error_str, product_name=product_name, tenant_id=tenant_id)
if attempt < max_retries - 1:
# Still retry even for other errors, in case it's transient
continue
else:
break # Don't retry on final attempt
# If all retries failed, log detailed error but don't mark as permanently failed yet
# Instead, we'll implement a fallback mechanism
logger.error("Failed to resolve product after all retries, attempting fallback",
product_name=product_name, tenant_id=tenant_id)
# FALLBACK: Try to create a temporary product with minimal data
try:
# Use a simplified approach with minimal data
fallback_data = {
'name': product_name,
'type': 'finished_product',
'unit': 'unit',
'current_stock': 0,
'cost_per_unit': 0
}
logger.info("Attempting fallback product creation with minimal data",
product_name=product_name, tenant_id=tenant_id)
created_product = await self.inventory_client.create_ingredient(fallback_data, str(tenant_id))
if created_product and created_product.get('id'):
product_id = created_product['id']
uuid_id = UUID(str(product_id))
self.product_cache[product_name] = uuid_id
logger.info("SUCCESS: Fallback product creation succeeded",
product_name=product_name, product_id=product_id, tenant_id=tenant_id)
return uuid_id
except Exception as fallback_error:
logger.error("Fallback product creation also failed",
product_name=product_name, error=str(fallback_error), tenant_id=tenant_id)
# Only mark as permanently failed after all attempts including fallback
self.failed_products.add(product_name)
logger.error("CRITICAL: Permanently failed to resolve product - this will result in missing training data",
product_name=product_name, tenant_id=tenant_id)
return None
async def _batch_resolve_products(self, unique_products: set, tenant_id: str) -> None:
"""Batch resolve/create products to reduce API calls and improve success rate"""
if not unique_products:
return
logger.info(f"Starting batch product resolution for {len(unique_products)} unique products")
# Convert set to list for easier handling
products_list = list(unique_products)
batch_size = 5 # Process in smaller batches to avoid overwhelming the inventory service
for i in range(0, len(products_list), batch_size):
batch = products_list[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}/{(len(products_list) + batch_size - 1)//batch_size}")
# Process each product in the batch with retry logic
for product_name, product_category in batch:
try:
# Skip if already in cache or failed list
if product_name in self.product_cache or product_name in self.failed_products:
continue
# Try to resolve the product
await self._resolve_product_to_inventory_id(product_name, product_category, tenant_id)
# Add small delay between products to be gentle on the API
await asyncio.sleep(0.5)
except Exception as e:
logger.warning(f"Failed to batch process product {product_name}: {e}")
continue
# Add delay between batches
if i + batch_size < len(products_list):
logger.info("Waiting between batches to avoid rate limiting...")
await asyncio.sleep(2.0)
successful_resolutions = len([p for p, _ in products_list if p in self.product_cache])
failed_resolutions = len([p for p, _ in products_list if p in self.failed_products])
logger.info(f"Batch product resolution completed: {successful_resolutions} successful, {failed_resolutions} failed")
if failed_resolutions > 0:
logger.warning(f"ATTENTION: {failed_resolutions} products failed to resolve - these will be missing from training data")
return
def _structure_messages(self, messages: List[Union[str, Dict]]) -> List[Dict[str, Any]]:
"""Convert string messages to structured format"""
structured = []
for msg in messages:
if isinstance(msg, str):
structured.append({
"type": "general_message",
"message": msg,
"field": None,
"row": None,
"code": "GENERAL_MESSAGE"
})
else:
structured.append(msg)
return structured
def _generate_suggestions(
self,
validation_result: SalesValidationResult,
format_type: str,
warning_count: int
) -> List[str]:
"""Generate enhanced contextual suggestions"""
suggestions = []
if validation_result.is_valid:
suggestions.append("El archivo está listo para procesamiento")
suggestions.append(f"Se procesarán aproximadamente {validation_result.total_records} registros")
if validation_result.total_records > 1000:
suggestions.append("Archivo grande: el procesamiento puede tomar varios minutos")
suggestions.append("Considera dividir archivos muy grandes para mejor rendimiento")
if warning_count > 0:
suggestions.append("Revisa las advertencias antes de continuar")
suggestions.append("Los datos con advertencias se procesarán con valores por defecto")
# Format-specific suggestions
if format_type == "csv":
suggestions.append("Asegúrate de que las fechas estén en formato DD/MM/YYYY")
suggestions.append("Verifica que los números usen punto decimal (no coma)")
elif format_type in ["excel", "xlsx"]:
suggestions.append("Solo se procesará la primera hoja del archivo Excel")
suggestions.append("Evita celdas combinadas y fórmulas complejas")
else:
suggestions.append("Corrige los errores antes de continuar")
suggestions.append("Verifica que el archivo tenga el formato correcto")
if format_type not in ["csv", "excel", "xlsx", "json"]:
suggestions.append("Usa formato CSV o Excel para mejores resultados")
suggestions.append("El formato JSON es para usuarios avanzados")
if validation_result.total_records == 0:
suggestions.append("Asegúrate de que el archivo contenga datos")
suggestions.append("Verifica que el archivo no esté corrupto")
# Missing column suggestions
error_codes = [error.get("code", "") for error in validation_result.errors if isinstance(error, dict)]
if "MISSING_DATE_COLUMN" in error_codes:
suggestions.append("Incluye una columna de fecha (fecha, date, dia)")
if "MISSING_PRODUCT_COLUMN" in error_codes:
suggestions.append("Incluye una columna de producto (producto, product, item)")
return suggestions
# Main DataImportService class with enhanced functionality