From 2d1ce2d523b231b9cfa26df6705e49c63a49fc8c Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Tue, 29 Jul 2025 21:43:26 +0200 Subject: [PATCH] Add publish events to the training phase - fix --- .../app/services/training_orchestrator.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/services/training/app/services/training_orchestrator.py b/services/training/app/services/training_orchestrator.py index db67a430..4aa69558 100644 --- a/services/training/app/services/training_orchestrator.py +++ b/services/training/app/services/training_orchestrator.py @@ -69,16 +69,16 @@ class TrainingDataOrchestrator: try: - publish_job_progress(job_id, tenant_id, 5, "Extraer datos de venta") + #publish_job_progress(job_id, tenant_id, 5, "Extraer datos de venta") sales_data = await self.data_client.fetch_sales_data(tenant_id) # Step 1: Extract and validate sales data date range - publish_job_progress(job_id, tenant_id, 10, "Extraer y validar las fechas de de los datos de venta") + #publish_job_progress(job_id, tenant_id, 10, "Extraer y validar las fechas de de los datos de venta") sales_date_range = self._extract_sales_date_range(sales_data) logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}") # Step 2: Apply date alignment across all data sources - publish_job_progress(job_id, tenant_id, 15, "Aplicar la alineación de fechas en todas las fuentes de datos") + #publish_job_progress(job_id, tenant_id, 15, "Aplicar la alineación de fechas en todas las fuentes de datos") aligned_range = self.date_alignment_service.validate_and_align_dates( user_sales_range=sales_date_range, requested_start=requested_start, @@ -90,18 +90,18 @@ class TrainingDataOrchestrator: logger.info(f"Applied constraints: {aligned_range.constraints}") # Step 3: Filter sales data to aligned date range - publish_job_progress(job_id, tenant_id, 20, "Aplicar la alineación de fechas en todas las fuentes de datos") + #publish_job_progress(job_id, tenant_id, 20, "Aplicar la alineación de fechas en todas las fuentes de datos") filtered_sales = self._filter_sales_data(sales_data, aligned_range) # Step 4: Collect external data sources concurrently logger.info("Collecting external data sources...") - publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas") + #publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas") weather_data, traffic_data = await self._collect_external_data( aligned_range, bakery_location, tenant_id ) # Step 5: Validate data quality - publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos") + #publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos") data_quality_results = self._validate_data_sources( filtered_sales, weather_data, traffic_data, aligned_range ) @@ -128,7 +128,7 @@ class TrainingDataOrchestrator: ) # Step 7: Final validation - publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos") + #publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos") final_validation = self.validate_training_data_quality(training_dataset) training_dataset.metadata["final_validation"] = final_validation @@ -141,7 +141,7 @@ class TrainingDataOrchestrator: return training_dataset except Exception as e: - publish_job_failed(job_id, tenant_id, str(e)) + #publish_job_failed(job_id, tenant_id, str(e)) logger.error(f"Training data preparation failed: {str(e)}") raise ValueError(f"Failed to prepare training data: {str(e)}")