Fix new Frontend 13

This commit is contained in:
Urtzi Alfaro
2025-08-04 18:58:12 +02:00
parent 35b02ca364
commit 0ba543a19a
5 changed files with 97 additions and 50 deletions

View File

@@ -20,12 +20,7 @@ from app.core.config import settings
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.messaging import (
publish_job_progress,
publish_data_validation_started,
publish_data_validation_completed,
publish_job_step_completed
)
from app.services.messaging import TrainingStatusPublisher
logger = logging.getLogger(__name__)
@@ -59,6 +54,8 @@ class BakeryMLTrainer:
logger.info(f"Starting ML training pipeline {job_id} for tenant {tenant_id}")
self.status_publisher = TrainingStatusPublisher(job_id, tenant_id)
try:
# Convert sales data to DataFrame
sales_df = pd.DataFrame(training_dataset.sales_data)
@@ -72,12 +69,18 @@ class BakeryMLTrainer:
products = sales_df['product_name'].unique().tolist()
logger.info(f"Training models for {len(products)} products: {products}")
self.status_publisher.products_total = len(products)
# Process data for each product
logger.info("Processing data for all products...")
processed_data = await self._process_all_products(
sales_df, weather_df, traffic_df, products
)
await publish_job_progress(job_id, tenant_id, 20, "feature_engineering", estimated_time_remaining_minutes=7)
await self.status_publisher.progress_update(
progress=20,
step="feature_engineering",
step_details="Processing features for all products"
)
# Train models for each processed product
logger.info("Training models for all products...")
@@ -87,7 +90,11 @@ class BakeryMLTrainer:
# Calculate overall training summary
summary = self._calculate_training_summary(training_results)
await publish_job_progress(job_id, tenant_id, 90, "model_validation", estimated_time_remaining_minutes=1)
await self.status_publisher.progress_update(
progress=90,
step="model_validation",
step_details="Validating model performance"
)
result = {
"job_id": job_id,
@@ -399,16 +406,11 @@ class BakeryMLTrainer:
job_id: str) -> Dict[str, Any]:
"""Train models for all processed products using Prophet manager"""
training_results = {}
i = 0
total_products = len(processed_data)
base_progress = 45
max_progress = 85 # or whatever your target end progress is
products_total = 0
i = 0
max_progress = 85
start_time = time.time()
processing_times = [] # Store individual processing times
for product_name, product_data in processed_data.items():
product_start_time = time.time()
try:
@@ -424,7 +426,6 @@ class BakeryMLTrainer:
'message': f'Need at least {settings.MIN_TRAINING_DATA_DAYS} data points, got {len(product_data)}'
}
logger.warning(f"Skipping {product_name}: insufficient data ({len(product_data)} < {settings.MIN_TRAINING_DATA_DAYS})")
processing_times.append(time.time() - product_start_time)
continue
# Train the model using Prophet manager
@@ -444,6 +445,20 @@ class BakeryMLTrainer:
logger.info(f"Successfully trained model for {product_name}")
completed_products = i + 1
progress = base_progress + int((completed_products / total_products) * (max_progress - base_progress))
if self.status_publisher:
# Update products completed for accurate tracking
self.status_publisher.products_completed = completed_products
await self.status_publisher.product_completed(
progress=progress,
step="model_training",
current_product=product_name,
step_details=f"Completed training for {product_name}"
)
except Exception as e:
logger.error(f"Failed to train model for {product_name}: {str(e)}")
training_results[product_name] = {
@@ -452,29 +467,18 @@ class BakeryMLTrainer:
'data_points': len(product_data) if product_data is not None else 0,
'failed_at': datetime.now().isoformat()
}
completed_products = i + 1
if self.status_publisher:
self.status_publisher.products_completed = completed_products
await self.status_publisher.progress_update(
progress=progress,
step="model_training",
current_product=product_name,
step_details=f"Failed training for {product_name}: {str(e)}"
)
# Record processing time for this product
product_processing_time = time.time() - product_start_time
processing_times.append(product_processing_time)
i += 1
current_progress = base_progress + int((i / total_products) * (max_progress - base_progress))
# Calculate estimated time remaining
estimated_time_remaining_minutes = self.calculate_estimated_time_remaining(
processing_times, i, total_products
)
await publish_job_progress(
job_id,
tenant_id,
current_progress,
"model_training",
product_name,
products_total,
total_products,
estimated_time_remaining_minutes=estimated_time_remaining_minutes
)
return training_results

View File

@@ -518,27 +518,59 @@ class TrainingStatusPublisher:
current_product: Optional[str] = None,
step_details: Optional[str] = None
):
"""Publish progress update with calculated time estimates"""
"""Publish progress update with improved time estimates"""
elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60
if progress > 0:
estimated_total_minutes = (elapsed_minutes / progress) * 100
estimated_remaining = max(0, estimated_total_minutes - elapsed_minutes)
else:
estimated_remaining = None
# Improved estimation based on training phases
estimated_remaining = self._calculate_smart_time_remaining(progress, elapsed_minutes, step)
await publish_job_progress(
job_id=self.job_id,
tenant_id=self.tenant_id,
progress=int(progress), # Ensure int
progress=int(progress),
step=step,
current_product=current_product,
products_completed=int(self.products_completed), # Ensure int
products_total=int(self.products_total), # Ensure int
products_completed=int(self.products_completed),
products_total=int(self.products_total),
estimated_time_remaining_minutes=int(estimated_remaining) if estimated_remaining else None,
step_details=step_details
)
def _calculate_smart_time_remaining(self, progress: int, elapsed_minutes: float, step: str) -> Optional[int]:
"""Calculate estimated time remaining using phase-based estimation"""
# Define expected time distribution for each phase
phase_durations = {
"data_validation": 1.0, # 1 minute
"feature_engineering": 2.0, # 2 minutes
"model_training": 8.0, # 8 minutes (bulk of time)
"model_validation": 1.0 # 1 minute
}
total_expected_minutes = sum(phase_durations.values()) # 12 minutes
# Calculate progress through phases
if progress <= 10: # data_validation phase
remaining_in_phase = phase_durations["data_validation"] * (1 - (progress / 10))
remaining_after_phase = sum(list(phase_durations.values())[1:])
return int(remaining_in_phase + remaining_after_phase)
elif progress <= 20: # feature_engineering phase
remaining_in_phase = phase_durations["feature_engineering"] * (1 - ((progress - 10) / 10))
remaining_after_phase = sum(list(phase_durations.values())[2:])
return int(remaining_in_phase + remaining_after_phase)
elif progress <= 90: # model_training phase (biggest chunk)
remaining_in_phase = phase_durations["model_training"] * (1 - ((progress - 20) / 70))
remaining_after_phase = phase_durations["model_validation"]
return int(remaining_in_phase + remaining_after_phase)
elif progress <= 100: # model_validation phase
remaining_in_phase = phase_durations["model_validation"] * (1 - ((progress - 90) / 10))
return int(remaining_in_phase)
return 0
async def product_completed(self, product_name: str, model_id: str, metrics: Optional[Dict] = None):
"""Mark a product as completed and update progress"""
self.products_completed += 1

View File

@@ -28,6 +28,8 @@ from app.services.messaging import (
publish_job_failed
)
from app.services.messaging import TrainingStatusPublisher
logger = logging.getLogger(__name__)
class TrainingService:
@@ -73,6 +75,7 @@ class TrainingService:
logger.info(f"Starting training job {job_id} for tenant {tenant_id}")
self.status_publisher = TrainingStatusPublisher(job_id, tenant_id)
try:
@@ -85,7 +88,12 @@ class TrainingService:
requested_end=requested_end,
job_id=job_id
)
await publish_job_progress(job_id, tenant_id, 10, "data_validation", estimated_time_remaining_minutes=8)
await self.status_publisher.progress_update(
progress=10,
step="data_validation",
step_details="Data validation and alignment completed"
)
# Step 2: Execute ML training pipeline
logger.info("Step 2: Starting ML training pipeline")