Add publish events to the training phase
This commit is contained in:
@@ -16,6 +16,8 @@ import pandas as pd
|
|||||||
from app.services.data_client import DataClient
|
from app.services.data_client import DataClient
|
||||||
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType, AlignedDateRange
|
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType, AlignedDateRange
|
||||||
|
|
||||||
|
from app.services.messaging import publish_job_progress, publish_job_failed
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -67,13 +69,16 @@ class TrainingDataOrchestrator:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
|
publish_job_progress(job_id, tenant_id, 5, "Extraer datos de venta")
|
||||||
sales_data = await self.data_client.fetch_sales_data(tenant_id)
|
sales_data = await self.data_client.fetch_sales_data(tenant_id)
|
||||||
|
|
||||||
# Step 1: Extract and validate sales data date range
|
# Step 1: Extract and validate sales data date range
|
||||||
|
publish_job_progress(job_id, tenant_id, 10, "Extraer y validar las fechas de de los datos de venta")
|
||||||
sales_date_range = self._extract_sales_date_range(sales_data)
|
sales_date_range = self._extract_sales_date_range(sales_data)
|
||||||
logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}")
|
logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}")
|
||||||
|
|
||||||
# Step 2: Apply date alignment across all data sources
|
# Step 2: Apply date alignment across all data sources
|
||||||
|
publish_job_progress(job_id, tenant_id, 15, "Aplicar la alineación de fechas en todas las fuentes de datos")
|
||||||
aligned_range = self.date_alignment_service.validate_and_align_dates(
|
aligned_range = self.date_alignment_service.validate_and_align_dates(
|
||||||
user_sales_range=sales_date_range,
|
user_sales_range=sales_date_range,
|
||||||
requested_start=requested_start,
|
requested_start=requested_start,
|
||||||
@@ -85,15 +90,18 @@ class TrainingDataOrchestrator:
|
|||||||
logger.info(f"Applied constraints: {aligned_range.constraints}")
|
logger.info(f"Applied constraints: {aligned_range.constraints}")
|
||||||
|
|
||||||
# Step 3: Filter sales data to aligned date range
|
# Step 3: Filter sales data to aligned date range
|
||||||
|
publish_job_progress(job_id, tenant_id, 20, "Aplicar la alineación de fechas en todas las fuentes de datos")
|
||||||
filtered_sales = self._filter_sales_data(sales_data, aligned_range)
|
filtered_sales = self._filter_sales_data(sales_data, aligned_range)
|
||||||
|
|
||||||
# Step 4: Collect external data sources concurrently
|
# Step 4: Collect external data sources concurrently
|
||||||
logger.info("Collecting external data sources...")
|
logger.info("Collecting external data sources...")
|
||||||
|
publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas")
|
||||||
weather_data, traffic_data = await self._collect_external_data(
|
weather_data, traffic_data = await self._collect_external_data(
|
||||||
aligned_range, bakery_location, tenant_id
|
aligned_range, bakery_location, tenant_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 5: Validate data quality
|
# Step 5: Validate data quality
|
||||||
|
publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos")
|
||||||
data_quality_results = self._validate_data_sources(
|
data_quality_results = self._validate_data_sources(
|
||||||
filtered_sales, weather_data, traffic_data, aligned_range
|
filtered_sales, weather_data, traffic_data, aligned_range
|
||||||
)
|
)
|
||||||
@@ -120,6 +128,7 @@ class TrainingDataOrchestrator:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Step 7: Final validation
|
# Step 7: Final validation
|
||||||
|
publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos")
|
||||||
final_validation = self.validate_training_data_quality(training_dataset)
|
final_validation = self.validate_training_data_quality(training_dataset)
|
||||||
training_dataset.metadata["final_validation"] = final_validation
|
training_dataset.metadata["final_validation"] = final_validation
|
||||||
|
|
||||||
@@ -132,6 +141,7 @@ class TrainingDataOrchestrator:
|
|||||||
return training_dataset
|
return training_dataset
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
publish_job_failed(job_id, tenant_id, str(e))
|
||||||
logger.error(f"Training data preparation failed: {str(e)}")
|
logger.error(f"Training data preparation failed: {str(e)}")
|
||||||
raise ValueError(f"Failed to prepare training data: {str(e)}")
|
raise ValueError(f"Failed to prepare training data: {str(e)}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user