Fix new services implementation 3

This commit is contained in:
Urtzi Alfaro
2025-08-14 16:47:34 +02:00
parent 0951547e92
commit 03737430ee
51 changed files with 657 additions and 982 deletions

View File

@@ -62,7 +62,7 @@ class EnhancedBakeryDataProcessor:
sales_data: pd.DataFrame,
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame,
product_name: str,
inventory_product_id: str,
tenant_id: str = None,
job_id: str = None,
session=None) -> pd.DataFrame:
@@ -73,7 +73,7 @@ class EnhancedBakeryDataProcessor:
sales_data: Historical sales data for the product
weather_data: Weather data
traffic_data: Traffic data
product_name: Product name for logging
inventory_product_id: Inventory product UUID for logging
tenant_id: Optional tenant ID for tracking
job_id: Optional job ID for tracking
@@ -82,7 +82,7 @@ class EnhancedBakeryDataProcessor:
"""
try:
logger.info("Preparing enhanced training data using repository pattern",
product_name=product_name,
inventory_product_id=inventory_product_id,
tenant_id=tenant_id,
job_id=job_id)
@@ -93,11 +93,11 @@ class EnhancedBakeryDataProcessor:
# Log data preparation start if we have tracking info
if job_id and tenant_id:
await repos['training_log'].update_log_progress(
job_id, 15, f"preparing_data_{product_name}", "running"
job_id, 15, f"preparing_data_{inventory_product_id}", "running"
)
# Step 1: Convert and validate sales data
sales_clean = await self._process_sales_data(sales_data, product_name)
sales_clean = await self._process_sales_data(sales_data, inventory_product_id)
# FIX: Ensure timezone awareness before any operations
sales_clean = self._ensure_timezone_aware(sales_clean)
@@ -129,32 +129,32 @@ class EnhancedBakeryDataProcessor:
# Step 9: Store processing metadata if we have a tenant
if tenant_id:
await self._store_processing_metadata(
repos, tenant_id, product_name, prophet_data, job_id
repos, tenant_id, inventory_product_id, prophet_data, job_id
)
logger.info("Enhanced training data prepared successfully",
product_name=product_name,
inventory_product_id=inventory_product_id,
data_points=len(prophet_data))
return prophet_data
except Exception as e:
logger.error("Error preparing enhanced training data",
product_name=product_name,
inventory_product_id=inventory_product_id,
error=str(e))
raise
async def _store_processing_metadata(self,
repos: Dict,
tenant_id: str,
product_name: str,
inventory_product_id: str,
processed_data: pd.DataFrame,
job_id: str = None):
"""Store data processing metadata using repository"""
try:
# Create processing metadata
metadata = {
"product_name": product_name,
"inventory_product_id": inventory_product_id,
"data_points": len(processed_data),
"date_range": {
"start": processed_data['ds'].min().isoformat(),
@@ -167,7 +167,7 @@ class EnhancedBakeryDataProcessor:
# Log processing completion
if job_id:
await repos['training_log'].update_log_progress(
job_id, 25, f"data_prepared_{product_name}", "running"
job_id, 25, f"data_prepared_{inventory_product_id}", "running"
)
except Exception as e:
@@ -270,7 +270,7 @@ class EnhancedBakeryDataProcessor:
logger.warning("Date alignment failed, using original data", error=str(e))
return sales_data
async def _process_sales_data(self, sales_data: pd.DataFrame, product_name: str) -> pd.DataFrame:
async def _process_sales_data(self, sales_data: pd.DataFrame, inventory_product_id: str) -> pd.DataFrame:
"""Process and clean sales data with enhanced validation"""
sales_clean = sales_data.copy()
@@ -305,9 +305,9 @@ class EnhancedBakeryDataProcessor:
sales_clean = sales_clean.dropna(subset=['quantity'])
sales_clean = sales_clean[sales_clean['quantity'] >= 0] # No negative sales
# Filter for the specific product if product_name column exists
if 'product_name' in sales_clean.columns:
sales_clean = sales_clean[sales_clean['product_name'] == product_name]
# Filter for the specific product if inventory_product_id column exists
if 'inventory_product_id' in sales_clean.columns:
sales_clean = sales_clean[sales_clean['inventory_product_id'] == inventory_product_id]
# Remove duplicate dates (keep the one with highest quantity)
sales_clean = sales_clean.sort_values(['date', 'quantity'], ascending=[True, False])

View File

@@ -52,7 +52,7 @@ class BakeryProphetManager:
async def train_bakery_model(self,
tenant_id: str,
product_name: str,
inventory_product_id: str,
df: pd.DataFrame,
job_id: str) -> Dict[str, Any]:
"""
@@ -60,10 +60,10 @@ class BakeryProphetManager:
Same interface as before - optimization happens automatically.
"""
try:
logger.info(f"Training optimized bakery model for {product_name}")
logger.info(f"Training optimized bakery model for {inventory_product_id}")
# Validate input data
await self._validate_training_data(df, product_name)
await self._validate_training_data(df, inventory_product_id)
# Prepare data for Prophet
prophet_data = await self._prepare_prophet_data(df)
@@ -72,8 +72,8 @@ class BakeryProphetManager:
regressor_columns = self._extract_regressor_columns(prophet_data)
# Automatically optimize hyperparameters (this is the new part)
logger.info(f"Optimizing hyperparameters for {product_name}...")
best_params = await self._optimize_hyperparameters(prophet_data, product_name, regressor_columns)
logger.info(f"Optimizing hyperparameters for {inventory_product_id}...")
best_params = await self._optimize_hyperparameters(prophet_data, inventory_product_id, regressor_columns)
# Create optimized Prophet model
model = self._create_optimized_prophet_model(best_params, regressor_columns)
@@ -92,7 +92,7 @@ class BakeryProphetManager:
# Store model and metrics - Generate proper UUID for model_id
model_id = str(uuid.uuid4())
model_path = await self._store_model(
tenant_id, product_name, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
tenant_id, inventory_product_id, model, model_id, prophet_data, regressor_columns, best_params, training_metrics
)
# Return same format as before, but with optimization info
@@ -112,17 +112,17 @@ class BakeryProphetManager:
}
}
logger.info(f"Optimized model trained successfully for {product_name}. "
logger.info(f"Optimized model trained successfully for {inventory_product_id}. "
f"MAPE: {training_metrics.get('optimized_mape', 'N/A')}%")
return model_info
except Exception as e:
logger.error(f"Failed to train optimized bakery model for {product_name}: {str(e)}")
logger.error(f"Failed to train optimized bakery model for {inventory_product_id}: {str(e)}")
raise
async def _optimize_hyperparameters(self,
df: pd.DataFrame,
product_name: str,
inventory_product_id: str,
regressor_columns: List[str]) -> Dict[str, Any]:
"""
Automatically optimize Prophet hyperparameters using Bayesian optimization.
@@ -130,7 +130,7 @@ class BakeryProphetManager:
"""
# Determine product category automatically
product_category = self._classify_product(product_name, df)
product_category = self._classify_product(inventory_product_id, df)
# Set optimization parameters based on category
n_trials = {
@@ -140,7 +140,7 @@ class BakeryProphetManager:
'intermittent': 15 # Reduced from 25
}.get(product_category, 25)
logger.info(f"Product {product_name} classified as {product_category}, using {n_trials} trials")
logger.info(f"Product {inventory_product_id} classified as {product_category}, using {n_trials} trials")
# Check data quality and adjust strategy
total_sales = df['y'].sum()
@@ -148,12 +148,12 @@ class BakeryProphetManager:
mean_sales = df['y'].mean()
non_zero_days = len(df[df['y'] > 0])
logger.info(f"Data analysis for {product_name}: total_sales={total_sales:.1f}, "
logger.info(f"Data analysis for {inventory_product_id}: total_sales={total_sales:.1f}, "
f"zero_ratio={zero_ratio:.2f}, mean_sales={mean_sales:.2f}, non_zero_days={non_zero_days}")
# Adjust strategy based on data characteristics
if zero_ratio > 0.8 or non_zero_days < 30:
logger.warning(f"Very sparse data for {product_name}, using minimal optimization")
logger.warning(f"Very sparse data for {inventory_product_id}, using minimal optimization")
return {
'changepoint_prior_scale': 0.001,
'seasonality_prior_scale': 0.01,
@@ -166,7 +166,7 @@ class BakeryProphetManager:
'uncertainty_samples': 100 # ✅ FIX: Minimal uncertainty sampling for very sparse data
}
elif zero_ratio > 0.6:
logger.info(f"Moderate sparsity for {product_name}, using conservative optimization")
logger.info(f"Moderate sparsity for {inventory_product_id}, using conservative optimization")
return {
'changepoint_prior_scale': 0.01,
'seasonality_prior_scale': 0.1,
@@ -180,7 +180,7 @@ class BakeryProphetManager:
}
# Use unique seed for each product to avoid identical results
product_seed = hash(product_name) % 10000
product_seed = hash(str(inventory_product_id)) % 10000
def objective(trial):
try:
@@ -284,13 +284,13 @@ class BakeryProphetManager:
cv_scores.append(mape_like)
except Exception as fold_error:
logger.debug(f"Fold failed for {product_name} trial {trial.number}: {str(fold_error)}")
logger.debug(f"Fold failed for {inventory_product_id} trial {trial.number}: {str(fold_error)}")
continue
return np.mean(cv_scores) if len(cv_scores) > 0 else 100.0
except Exception as trial_error:
logger.debug(f"Trial {trial.number} failed for {product_name}: {str(trial_error)}")
logger.debug(f"Trial {trial.number} failed for {inventory_product_id}: {str(trial_error)}")
return 100.0
# Run optimization with product-specific seed
@@ -304,19 +304,19 @@ class BakeryProphetManager:
best_params = study.best_params
best_score = study.best_value
logger.info(f"Optimization completed for {product_name}. Best score: {best_score:.2f}%. "
logger.info(f"Optimization completed for {inventory_product_id}. Best score: {best_score:.2f}%. "
f"Parameters: {best_params}")
# ✅ FIX: Log uncertainty sampling configuration for debugging confidence intervals
uncertainty_samples = best_params.get('uncertainty_samples', 500)
logger.info(f"Prophet model will use {uncertainty_samples} uncertainty samples for {product_name} "
logger.info(f"Prophet model will use {uncertainty_samples} uncertainty samples for {inventory_product_id} "
f"(category: {product_category}, zero_ratio: {zero_ratio:.2f})")
return best_params
def _classify_product(self, product_name: str, sales_data: pd.DataFrame) -> str:
def _classify_product(self, inventory_product_id: str, sales_data: pd.DataFrame) -> str:
"""Automatically classify product for optimization strategy - improved for bakery data"""
product_lower = product_name.lower()
product_lower = str(inventory_product_id).lower()
# Calculate sales statistics
total_sales = sales_data['y'].sum()
@@ -324,7 +324,7 @@ class BakeryProphetManager:
zero_ratio = (sales_data['y'] == 0).sum() / len(sales_data)
non_zero_days = len(sales_data[sales_data['y'] > 0])
logger.info(f"Product classification for {product_name}: total_sales={total_sales:.1f}, "
logger.info(f"Product classification for {inventory_product_id}: total_sales={total_sales:.1f}, "
f"mean_sales={mean_sales:.2f}, zero_ratio={zero_ratio:.2f}, non_zero_days={non_zero_days}")
# Improved classification logic for bakery products
@@ -499,7 +499,7 @@ class BakeryProphetManager:
async def _store_model(self,
tenant_id: str,
product_name: str,
inventory_product_id: str,
model: Prophet,
model_id: str,
training_data: pd.DataFrame,
@@ -520,7 +520,7 @@ class BakeryProphetManager:
metadata = {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"inventory_product_id": inventory_product_id,
"regressor_columns": regressor_columns,
"training_samples": len(training_data),
"data_period": {
@@ -539,7 +539,7 @@ class BakeryProphetManager:
json.dump(metadata, f, indent=2, default=str)
# Store in memory
model_key = f"{tenant_id}:{product_name}"
model_key = f"{tenant_id}:{inventory_product_id}"
self.models[model_key] = model
self.model_metadata[model_key] = metadata
@@ -547,13 +547,13 @@ class BakeryProphetManager:
try:
async with self.database_manager.get_session() as db_session:
# Deactivate previous models for this product
await self._deactivate_previous_models_with_session(db_session, tenant_id, product_name)
await self._deactivate_previous_models_with_session(db_session, tenant_id, inventory_product_id)
# Create new database record
db_model = TrainedModel(
id=model_id,
tenant_id=tenant_id,
product_name=product_name,
inventory_product_id=inventory_product_id,
model_type="prophet_optimized",
job_id=model_id.split('_')[0], # Extract job_id from model_id
model_path=str(model_path),
@@ -587,23 +587,23 @@ class BakeryProphetManager:
logger.info(f"Optimized model stored at: {model_path}")
return str(model_path)
async def _deactivate_previous_models_with_session(self, db_session, tenant_id: str, product_name: str):
async def _deactivate_previous_models_with_session(self, db_session, tenant_id: str, inventory_product_id: str):
"""Deactivate previous models for the same product using provided session"""
try:
# ✅ FIX: Wrap SQL string with text() for SQLAlchemy 2.0
query = text("""
UPDATE trained_models
SET is_active = false, is_production = false
WHERE tenant_id = :tenant_id AND product_name = :product_name
WHERE tenant_id = :tenant_id AND inventory_product_id = :inventory_product_id
""")
await db_session.execute(query, {
"tenant_id": tenant_id,
"product_name": product_name
"inventory_product_id": inventory_product_id
})
# Note: Don't commit here, let the calling method handle the transaction
logger.info(f"Successfully deactivated previous models for {product_name}")
logger.info(f"Successfully deactivated previous models for {inventory_product_id}")
except Exception as e:
logger.error(f"Failed to deactivate previous models: {str(e)}")
@@ -630,14 +630,14 @@ class BakeryProphetManager:
logger.error(f"Failed to generate forecast: {str(e)}")
raise
async def _validate_training_data(self, df: pd.DataFrame, product_name: str):
async def _validate_training_data(self, df: pd.DataFrame, inventory_product_id: str):
"""Validate training data quality (unchanged)"""
if df.empty:
raise ValueError(f"No training data available for {product_name}")
raise ValueError(f"No training data available for {inventory_product_id}")
if len(df) < settings.MIN_TRAINING_DATA_DAYS:
raise ValueError(
f"Insufficient training data for {product_name}: "
f"Insufficient training data for {inventory_product_id}: "
f"{len(df)} days, minimum required: {settings.MIN_TRAINING_DATA_DAYS}"
)

View File

@@ -91,7 +91,7 @@ class EnhancedBakeryMLTrainer:
await self._validate_input_data(sales_df, tenant_id)
# Get unique products from the sales data
products = sales_df['product_name'].unique().tolist()
products = sales_df['inventory_product_id'].unique().tolist()
logger.info("Training enhanced models",
products_count=len(products),
products=products)
@@ -183,17 +183,17 @@ class EnhancedBakeryMLTrainer:
"""Process data for all products using enhanced processor with repository tracking"""
processed_data = {}
for product_name in products:
for inventory_product_id in products:
try:
logger.info("Processing data for product using enhanced processor",
product_name=product_name)
inventory_product_id=inventory_product_id)
# Filter sales data for this product
product_sales = sales_df[sales_df['product_name'] == product_name].copy()
product_sales = sales_df[sales_df['inventory_product_id'] == inventory_product_id].copy()
if product_sales.empty:
logger.warning("No sales data found for product",
product_name=product_name)
inventory_product_id=inventory_product_id)
continue
# Use enhanced data processor with repository tracking
@@ -201,19 +201,19 @@ class EnhancedBakeryMLTrainer:
sales_data=product_sales,
weather_data=weather_df,
traffic_data=traffic_df,
product_name=product_name,
inventory_product_id=inventory_product_id,
tenant_id=tenant_id,
job_id=job_id
)
processed_data[product_name] = processed_product_data
processed_data[inventory_product_id] = processed_product_data
logger.info("Enhanced processing completed",
product_name=product_name,
inventory_product_id=inventory_product_id,
data_points=len(processed_product_data))
except Exception as e:
logger.error("Failed to process data using enhanced processor",
product_name=product_name,
inventory_product_id=inventory_product_id,
error=str(e))
continue
@@ -231,15 +231,15 @@ class EnhancedBakeryMLTrainer:
base_progress = 45
max_progress = 85
for product_name, product_data in processed_data.items():
for inventory_product_id, product_data in processed_data.items():
product_start_time = time.time()
try:
logger.info("Training enhanced model",
product_name=product_name)
inventory_product_id=inventory_product_id)
# Check if we have enough data
if len(product_data) < settings.MIN_TRAINING_DATA_DAYS:
training_results[product_name] = {
training_results[inventory_product_id] = {
'status': 'skipped',
'reason': 'insufficient_data',
'data_points': len(product_data),
@@ -247,7 +247,7 @@ class EnhancedBakeryMLTrainer:
'message': f'Need at least {settings.MIN_TRAINING_DATA_DAYS} data points, got {len(product_data)}'
}
logger.warning("Skipping product due to insufficient data",
product_name=product_name,
inventory_product_id=inventory_product_id,
data_points=len(product_data),
min_required=settings.MIN_TRAINING_DATA_DAYS)
continue
@@ -255,24 +255,24 @@ class EnhancedBakeryMLTrainer:
# Train the model using Prophet manager
model_info = await self.prophet_manager.train_bakery_model(
tenant_id=tenant_id,
product_name=product_name,
inventory_product_id=inventory_product_id,
df=product_data,
job_id=job_id
)
# Store model record using repository
model_record = await self._create_model_record(
repos, tenant_id, product_name, model_info, job_id, product_data
repos, tenant_id, inventory_product_id, model_info, job_id, product_data
)
# Create performance metrics record
if model_info.get('training_metrics'):
await self._create_performance_metrics(
repos, model_record.id if model_record else None,
tenant_id, product_name, model_info['training_metrics']
tenant_id, inventory_product_id, model_info['training_metrics']
)
training_results[product_name] = {
training_results[inventory_product_id] = {
'status': 'success',
'model_info': model_info,
'model_record_id': model_record.id if model_record else None,
@@ -282,7 +282,7 @@ class EnhancedBakeryMLTrainer:
}
logger.info("Successfully trained enhanced model",
product_name=product_name,
inventory_product_id=inventory_product_id,
model_record_id=model_record.id if model_record else None)
completed_products = i + 1
@@ -295,15 +295,15 @@ class EnhancedBakeryMLTrainer:
await self.status_publisher.progress_update(
progress=progress,
step="model_training",
current_product=product_name,
step_details=f"Enhanced training completed for {product_name}"
current_product=inventory_product_id,
step_details=f"Enhanced training completed for {inventory_product_id}"
)
except Exception as e:
logger.error("Failed to train enhanced model",
product_name=product_name,
inventory_product_id=inventory_product_id,
error=str(e))
training_results[product_name] = {
training_results[inventory_product_id] = {
'status': 'error',
'error_message': str(e),
'data_points': len(product_data) if product_data is not None else 0,
@@ -320,8 +320,8 @@ class EnhancedBakeryMLTrainer:
await self.status_publisher.progress_update(
progress=progress,
step="model_training",
current_product=product_name,
step_details=f"Enhanced training failed for {product_name}: {str(e)}"
current_product=inventory_product_id,
step_details=f"Enhanced training failed for {inventory_product_id}: {str(e)}"
)
return training_results
@@ -329,7 +329,7 @@ class EnhancedBakeryMLTrainer:
async def _create_model_record(self,
repos: Dict,
tenant_id: str,
product_name: str,
inventory_product_id: str,
model_info: Dict,
job_id: str,
processed_data: pd.DataFrame):
@@ -337,7 +337,7 @@ class EnhancedBakeryMLTrainer:
try:
model_data = {
"tenant_id": tenant_id,
"product_name": product_name,
"inventory_product_id": inventory_product_id,
"job_id": job_id,
"model_type": "enhanced_prophet",
"model_path": model_info.get("model_path"),
@@ -357,7 +357,7 @@ class EnhancedBakeryMLTrainer:
model_record = await repos['model'].create_model(model_data)
logger.info("Created enhanced model record",
product_name=product_name,
inventory_product_id=inventory_product_id,
model_id=model_record.id)
# Create artifacts for model files
@@ -374,7 +374,7 @@ class EnhancedBakeryMLTrainer:
except Exception as e:
logger.error("Failed to create enhanced model record",
product_name=product_name,
inventory_product_id=inventory_product_id,
error=str(e))
return None
@@ -382,14 +382,14 @@ class EnhancedBakeryMLTrainer:
repos: Dict,
model_id: str,
tenant_id: str,
product_name: str,
inventory_product_id: str,
metrics: Dict):
"""Create performance metrics record using repository"""
try:
metric_data = {
"model_id": str(model_id),
"tenant_id": tenant_id,
"product_name": product_name,
"inventory_product_id": inventory_product_id,
"mae": metrics.get("mae"),
"mse": metrics.get("mse"),
"rmse": metrics.get("rmse"),
@@ -401,12 +401,12 @@ class EnhancedBakeryMLTrainer:
await repos['performance'].create_performance_metric(metric_data)
logger.info("Created enhanced performance metrics",
product_name=product_name,
inventory_product_id=inventory_product_id,
model_id=model_id)
except Exception as e:
logger.error("Failed to create enhanced performance metrics",
product_name=product_name,
inventory_product_id=inventory_product_id,
error=str(e))
async def _calculate_enhanced_training_summary(self,
@@ -532,7 +532,7 @@ class EnhancedBakeryMLTrainer:
async def evaluate_model_performance_enhanced(self,
tenant_id: str,
product_name: str,
inventory_product_id: str,
model_path: str,
test_dataset: TrainingDataSet) -> Dict[str, Any]:
"""
@@ -553,17 +553,17 @@ class EnhancedBakeryMLTrainer:
test_traffic_df = pd.DataFrame(test_dataset.traffic_data)
# Filter for specific product
product_test_sales = test_sales_df[test_sales_df['product_name'] == product_name].copy()
product_test_sales = test_sales_df[test_sales_df['inventory_product_id'] == inventory_product_id].copy()
if product_test_sales.empty:
raise ValueError(f"No test data found for product: {product_name}")
raise ValueError(f"No test data found for product: {inventory_product_id}")
# Process test data using enhanced processor
processed_test_data = await self.enhanced_data_processor.prepare_training_data(
sales_data=product_test_sales,
weather_data=test_weather_df,
traffic_data=test_traffic_df,
product_name=product_name,
inventory_product_id=inventory_product_id,
tenant_id=tenant_id
)
@@ -608,16 +608,16 @@ class EnhancedBakeryMLTrainer:
metrics["mape"] = 100.0
# Store evaluation metrics in repository
model_records = await repos['model'].get_models_by_product(tenant_id, product_name)
model_records = await repos['model'].get_models_by_product(tenant_id, inventory_product_id)
if model_records:
latest_model = max(model_records, key=lambda x: x.created_at)
await self._create_performance_metrics(
repos, latest_model.id, tenant_id, product_name, metrics
repos, latest_model.id, tenant_id, inventory_product_id, metrics
)
result = {
"tenant_id": tenant_id,
"product_name": product_name,
"inventory_product_id": inventory_product_id,
"enhanced_evaluation_metrics": metrics,
"test_samples": len(processed_test_data),
"prediction_samples": len(forecast),