diff --git a/services/data/app/api/sales.py b/services/data/app/api/sales.py index 1750fd63..38a5fd3f 100644 --- a/services/data/app/api/sales.py +++ b/services/data/app/api/sales.py @@ -16,7 +16,7 @@ from app.core.database import get_db from app.core.auth import get_current_user, AuthInfo from app.services.sales_service import SalesService from app.services.data_import_service import DataImportService -from app.services.messaging import data_publisher +from app.services.messaging import publish_sales_created from app.schemas.sales import ( SalesDataCreate, SalesDataResponse, @@ -44,7 +44,7 @@ async def create_sales_record( # Publish event (with error handling) try: - await data_publisher.publish_sales_created({ + await publish_sales_created({ "tenant_id": str(sales_data.tenant_id), "product_name": sales_data.product_name, "quantity_sold": sales_data.quantity_sold, @@ -149,7 +149,7 @@ async def import_sales_json( if result["success"]: # Publish event (with error handling) try: - await data_publisher.publish_data_imported({ + await publish_data_imported({ "tenant_id": str(import_data.tenant_id), "type": "json_import", "records_created": result["records_created"], diff --git a/services/data/app/api/weather.py b/services/data/app/api/weather.py index 1d2c610d..724f0659 100644 --- a/services/data/app/api/weather.py +++ b/services/data/app/api/weather.py @@ -12,7 +12,7 @@ import structlog from app.core.database import get_db from app.core.auth import get_current_user, AuthInfo from app.services.weather_service import WeatherService -from app.services.messaging import data_publisher +from app.services.messaging import publish_weather_updated from app.schemas.external import ( WeatherDataResponse, WeatherForecastResponse, @@ -71,7 +71,7 @@ async def get_weather_forecast( # Publish event (with error handling) try: - await data_publisher.publish_weather_updated({ + await publish_weather_updated({ "type": "forecast_requested", "latitude": latitude, "longitude": longitude, @@ -118,7 +118,7 @@ async def get_historical_weather( # Publish event (with error handling) try: - await data_publisher.publish_weather_updated({ + await publish_weather_updated({ "type": "historical_requested", "latitude": latitude, "longitude": longitude, diff --git a/services/data/app/main.py b/services/data/app/main.py index 98863062..270058df 100644 --- a/services/data/app/main.py +++ b/services/data/app/main.py @@ -59,7 +59,6 @@ async def lifespan(app: FastAPI): metrics_collector.register_counter("traffic_api_calls_total", "Traffic API calls") metrics_collector.register_counter("import_jobs_total", "Data import jobs") metrics_collector.register_counter("template_downloads_total", "Template downloads") - metrics_collector.register_counter("errors_total", "Total errors") metrics_collector.register_histogram("sales_create_duration_seconds", "Sales record creation duration") metrics_collector.register_histogram("sales_list_duration_seconds", "Sales record list duration") metrics_collector.register_histogram("import_duration_seconds", "Data import duration") diff --git a/services/data/app/schemas/sales.py b/services/data/app/schemas/sales.py index 060a437e..60cf8fa0 100644 --- a/services/data/app/schemas/sales.py +++ b/services/data/app/schemas/sales.py @@ -93,21 +93,26 @@ class SalesValidationResult(BaseModel): class Config: from_attributes = True - + class SalesImportResult(BaseModel): - """Schema for sales import result""" + """Complete schema that includes all expected fields""" success: bool - records_processed: int + records_processed: int # total_rows records_created: int - records_updated: int - records_failed: int - errors: List[Dict[str, Any]] - warnings: List[Dict[str, Any]] + records_updated: int = 0 # Default to 0 if not tracking updates + records_failed: int # error_count or calculated + errors: List[Dict[str, Any]] # Structured error objects + warnings: List[Dict[str, Any]] # Structured warning objects processing_time_seconds: float + # Optional additional fields + source: Optional[str] = None + filename: Optional[str] = None + success_rate: Optional[float] = None + class Config: from_attributes = True - + class SalesAggregation(BaseModel): """Schema for sales aggregation results""" period: str # "daily", "weekly", "monthly" diff --git a/services/data/app/services/data_import_service.py b/services/data/app/services/data_import_service.py index 17e3c69f..1b8b6ca8 100644 --- a/services/data/app/services/data_import_service.py +++ b/services/data/app/services/data_import_service.py @@ -65,83 +65,246 @@ class DataImportService: '%d/%m/%Y %H:%M', # 15/01/2024 14:30 ] + @staticmethod - async def process_upload(tenant_id: str, - file_content: str, - file_format: str, - db: AsyncSession, - filename: Optional[str] = None) -> Dict[str, Any]: - """Process uploaded data file""" + async def process_upload(tenant_id: str, content: str, file_format: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process uploaded data and return complete response structure""" + start_time = datetime.utcnow() + try: logger.info("Starting data import", - tenant_id=tenant_id, - format=file_format, - filename=filename) + filename=filename, + format=file_format, + tenant_id=tenant_id) + # Process the data based on format if file_format.lower() == 'csv': - return await DataImportService._process_csv(tenant_id, file_content, db, filename) - elif file_format.lower() in ['xlsx', 'excel', 'xls']: - return await DataImportService._process_excel(tenant_id, file_content, db, filename) + result = await DataImportService._process_csv_data(tenant_id, content, db, filename) elif file_format.lower() == 'json': - return await DataImportService._process_json(tenant_id, file_content, db, filename) - elif file_format.lower() == 'pos': - return await DataImportService._process_pos_data(tenant_id, file_content, db, filename) + result = await DataImportService._process_json_data(tenant_id, content, db) + elif file_format.lower() in ['excel', 'xlsx']: + result = await DataImportService._process_excel_data(tenant_id, content, db, filename) else: - return { - "success": False, - "error": f"Formato no soportado: {file_format}. Formatos válidos: CSV, Excel, JSON, POS" - } + raise ValueError(f"Unsupported format: {file_format}") + + # Calculate processing time + end_time = datetime.utcnow() + processing_time = (end_time - start_time).total_seconds() + + # Convert errors list to structured format if needed + structured_errors = [] + for error in result.get("errors", []): + if isinstance(error, str): + structured_errors.append({ + "row": None, + "field": None, + "message": error, + "type": "general_error" + }) + else: + structured_errors.append(error) + + # Convert warnings list to structured format if needed + structured_warnings = [] + for warning in result.get("warnings", []): + if isinstance(warning, str): + structured_warnings.append({ + "row": None, + "field": None, + "message": warning, + "type": "general_warning" + }) + else: + structured_warnings.append(warning) + + # Calculate derived values + total_rows = result.get("total_rows", 0) + records_created = result.get("records_created", 0) + records_failed = total_rows - records_created - result.get("skipped", 0) + + # Return complete response structure matching SalesImportResult schema + complete_response = { + "success": result.get("success", False), + "records_processed": total_rows, # ADDED: total rows processed + "records_created": records_created, + "records_updated": 0, # ADDED: default to 0 (we don't update, only create) + "records_failed": records_failed, # ADDED: calculated failed records + "errors": structured_errors, # FIXED: structured error objects + "warnings": structured_warnings, # FIXED: structured warning objects + "processing_time_seconds": processing_time, # ADDED: processing time + + # Keep existing fields for backward compatibility + "total_rows": total_rows, + "skipped": result.get("skipped", 0), + "success_rate": result.get("success_rate", 0.0), + "source": file_format, + "filename": filename, + "error_count": len(structured_errors) + } + + logger.info("Data processing completed", + records_created=records_created, + success_rate=complete_response["success_rate"], + processing_time=processing_time) + + return complete_response + except Exception as e: - logger.error("Data import failed", error=str(e), tenant_id=tenant_id) + end_time = datetime.utcnow() + processing_time = (end_time - start_time).total_seconds() + + error_message = f"Import failed: {str(e)}" + logger.error("Data import failed", error=error_message, tenant_id=tenant_id) + + # Return error response with complete structure return { "success": False, - "error": f"Error en la importación: {str(e)}" + "records_processed": 0, + "records_created": 0, + "records_updated": 0, + "records_failed": 0, + "errors": [{ + "row": None, + "field": None, + "message": error_message, + "type": "import_error" + }], + "warnings": [], + "processing_time_seconds": processing_time, + + # Backward compatibility fields + "total_rows": 0, + "skipped": 0, + "success_rate": 0.0, + "source": file_format, + "filename": filename, + "error_count": 1 } - + + # Also need to update the _process_csv_data method to return proper structure @staticmethod - async def _process_csv(tenant_id: str, csv_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: - """Process CSV file with intelligent column mapping""" + async def _process_csv_data(tenant_id: str, csv_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process CSV data with improved error handling and structure""" try: - # Handle base64 encoded content - if csv_content.startswith('data:'): - csv_content = base64.b64decode(csv_content.split(',')[1]).decode('utf-8') + # Parse CSV + reader = csv.DictReader(io.StringIO(csv_content)) + rows = list(reader) - # Try different encodings if UTF-8 fails - encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] - df = None - - for encoding in encodings: - try: - csv_buffer = io.StringIO(csv_content) - df = pd.read_csv(csv_buffer, encoding=encoding) - break - except UnicodeDecodeError: - continue - - if df is None: - return {"success": False, "error": "No se pudo leer el archivo CSV con ninguna codificación"} - - # Clean column names - df.columns = df.columns.str.strip().str.lower() - - # Map columns to standard names - column_mapping = DataImportService._detect_columns(df.columns.tolist()) - - if not column_mapping.get('date') or not column_mapping.get('product'): + if not rows: return { "success": False, - "error": f"Columnas requeridas no encontradas. Detectadas: {list(df.columns)}. Se requieren: fecha y producto" + "total_rows": 0, + "records_created": 0, + "skipped": 0, + "success_rate": 0.0, + "errors": ["CSV file is empty"], + "warnings": [] } - # Process records - return await DataImportService._process_dataframe( - tenant_id, df, column_mapping, db, "csv", filename - ) + # Column mapping + column_mapping = DataImportService._get_column_mapping(list(rows[0].keys())) + + records_created = 0 + errors = [] + warnings = [] + skipped = 0 + + logger.info(f"Processing {len(rows)} records from CSV") + + for index, row in enumerate(rows): + try: + # Extract and validate date + date_str = str(row.get(column_mapping.get('date', ''), '')).strip() + if not date_str or date_str.lower() in ['nan', 'null', 'none', '']: + errors.append(f"Fila {index + 1}: Fecha faltante") + skipped += 1 + continue + + parsed_date = DataImportService._parse_date(date_str) + if not parsed_date: + errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}") + skipped += 1 + continue + + # Extract and validate product name + product_name = str(row.get(column_mapping.get('product', ''), '')).strip() + if not product_name or product_name.lower() in ['nan', 'null', 'none', '']: + errors.append(f"Fila {index + 1}: Nombre de producto faltante") + skipped += 1 + continue + + # Clean product name + product_name = DataImportService._clean_product_name(product_name) + + # Extract and validate quantity + quantity_raw = row.get(column_mapping.get('quantity', 'cantidad'), 1) + try: + quantity = int(float(str(quantity_raw).replace(',', '.'))) + if quantity <= 0: + warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1") + quantity = 1 + except (ValueError, TypeError): + warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1") + quantity = 1 + + # Extract revenue (optional) + revenue_raw = row.get(column_mapping.get('revenue', 'ingresos'), None) + revenue = None + if revenue_raw: + try: + revenue = float(str(revenue_raw).replace(',', '.')) + except (ValueError, TypeError): + revenue = quantity * 1.5 # Default calculation + else: + revenue = quantity * 1.5 # Default calculation + + # Extract location (optional) + location_id = row.get(column_mapping.get('location', 'ubicacion'), None) + + # Create sales record + sales_data = SalesDataCreate( + tenant_id=tenant_id, + date=parsed_date, # Use parsed_date instead of date + product_name=product_name, + quantity_sold=quantity, + revenue=revenue, + location_id=location_id, + source="csv" + ) + + await SalesService.create_sales_record(sales_data, db) + records_created += 1 + + except Exception as e: + error_msg = f"Fila {index + 1}: {str(e)}" + errors.append(error_msg) + skipped += 1 + logger.warning("Record processing failed", error=error_msg) + + success_rate = (records_created / len(rows)) * 100 if rows else 0 + + return { + "success": records_created > 0, + "total_rows": len(rows), + "records_created": records_created, + "skipped": skipped, + "success_rate": success_rate, + "errors": errors, + "warnings": warnings + } except Exception as e: logger.error("CSV processing failed", error=str(e)) - return {"success": False, "error": f"Error procesando CSV: {str(e)}"} - + return { + "success": False, + "total_rows": 0, + "records_created": 0, + "skipped": 0, + "success_rate": 0.0, + "errors": [f"CSV processing error: {str(e)}"], + "warnings": [] + } + @staticmethod async def _process_excel(tenant_id: str, excel_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: """Process Excel file""" @@ -645,4 +808,24 @@ class DataImportService: logger.error("Template generation failed", error=str(e)) return { "error": f"Error generando plantilla: {str(e)}" - } \ No newline at end of file + } + + @staticmethod + def _get_column_mapping(columns: List[str]) -> Dict[str, str]: + """Get column mapping - alias for _detect_columns""" + return DataImportService._detect_columns(columns) + + @staticmethod + def _clean_product_name(product_name: str) -> str: + """Clean and normalize product name""" + if not product_name: + return "" + + # Basic cleaning + cleaned = str(product_name).strip().lower() + + # Remove extra whitespace + import re + cleaned = re.sub(r'\s+', ' ', cleaned) + + return cleaned \ No newline at end of file