Files
bakery-ia/services/production/app/ml/yield_predictor.py
2025-12-15 21:14:22 +01:00

814 lines
34 KiB
Python

"""
Production Yield Predictor
Predicts actual vs planned yield and identifies waste reduction opportunities
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import structlog
from scipy import stats
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
logger = structlog.get_logger()
class YieldPredictor:
"""
Predicts production yield based on historical data and production factors.
Key Features:
- Multi-factor yield prediction (recipe, worker, time-of-day, equipment, batch size)
- Identifies low-yield patterns and root causes
- Waste categorization (spoilage, measurement error, process inefficiency)
- Actionable recommendations for yield improvement
- Statistical validation of learned patterns
Methodology:
1. Feature Engineering: Extract worker skill, time factors, batch size effects
2. Statistical Analysis: Identify significant yield loss factors
3. ML Prediction: Ensemble of Random Forest + Gradient Boosting
4. Pattern Detection: Find recurring low-yield situations
5. Insight Generation: Actionable recommendations with confidence scores
"""
def __init__(self):
self.model_cache = {} # Cache trained models per recipe
self.baseline_yields = {} # Cache baseline yields per recipe
async def predict_yield(
self,
tenant_id: str,
recipe_id: str,
production_history: pd.DataFrame,
production_context: Dict[str, Any],
min_history_runs: int = 30
) -> Dict[str, Any]:
"""
Predict yield for upcoming production run and generate insights.
Args:
tenant_id: Tenant identifier
recipe_id: Recipe identifier
production_history: Historical production runs with columns:
- production_run_id
- recipe_id
- planned_quantity
- actual_quantity
- yield_percentage
- staff_assigned (list of staff IDs)
- started_at
- completed_at
- batch_size
- equipment_id (optional)
- notes (optional)
production_context: Upcoming production context:
- staff_assigned (list of staff IDs)
- planned_start_time
- batch_size
- equipment_id (optional)
min_history_runs: Minimum production runs required for learning
Returns:
Prediction results with yield forecast, confidence, and insights
"""
logger.info(
"Predicting production yield",
tenant_id=tenant_id,
recipe_id=recipe_id,
history_runs=len(production_history)
)
# Validate production history
if len(production_history) < min_history_runs:
return self._insufficient_data_response(
recipe_id, production_context, len(production_history), min_history_runs
)
# Step 1: Calculate baseline statistics
baseline_stats = self._calculate_baseline_statistics(production_history)
# Step 2: Feature engineering
feature_df = self._engineer_features(production_history)
# Step 3: Analyze yield factors
factor_analysis = self._analyze_yield_factors(feature_df)
# Step 4: Train predictive model
model_results = self._train_yield_model(feature_df)
# Step 5: Make prediction for upcoming run
prediction = self._predict_upcoming_run(
production_context, model_results, baseline_stats, feature_df
)
# Step 6: Identify low-yield patterns
patterns = self._identify_yield_patterns(feature_df, factor_analysis)
# Step 7: Generate insights
insights = self._generate_yield_insights(
tenant_id, recipe_id, baseline_stats, factor_analysis,
patterns, prediction, production_context
)
# Step 8: Calculate confidence
confidence = self._calculate_prediction_confidence(
production_history, model_results, factor_analysis
)
return {
'recipe_id': recipe_id,
'predicted_at': datetime.utcnow().isoformat(),
'history_runs': len(production_history),
'baseline_yield': baseline_stats['mean_yield'],
'baseline_std': baseline_stats['std_yield'],
'predicted_yield': prediction['predicted_yield'],
'prediction_range': prediction['prediction_range'],
'expected_waste': prediction['expected_waste'],
'confidence': confidence,
'factor_analysis': factor_analysis,
'patterns': patterns,
'model_performance': model_results['performance'],
'insights': insights
}
def _insufficient_data_response(
self, recipe_id: str, production_context: Dict[str, Any],
current_runs: int, required_runs: int
) -> Dict[str, Any]:
"""Return response when insufficient historical data."""
return {
'recipe_id': recipe_id,
'predicted_at': datetime.utcnow().isoformat(),
'history_runs': current_runs,
'status': 'insufficient_data',
'required_runs': required_runs,
'baseline_yield': None,
'predicted_yield': None,
'confidence': 0,
'insights': [{
'type': 'warning',
'priority': 'low',
'category': 'production',
'title': f'Insufficient Production History for Yield Prediction',
'description': f'Only {current_runs} production runs available. Need at least {required_runs} runs to build reliable yield predictions. Continue tracking production data to enable yield optimization.',
'impact_type': 'data_quality',
'confidence': 100,
'actionable': True,
'recommendation_actions': [{
'label': 'Track Production Data',
'action': 'continue_production_tracking',
'params': {'recipe_id': recipe_id}
}]
}]
}
def _calculate_baseline_statistics(
self, production_history: pd.DataFrame
) -> Dict[str, Any]:
"""Calculate baseline yield statistics."""
yields = production_history['yield_percentage'].values
return {
'mean_yield': float(np.mean(yields)),
'median_yield': float(np.median(yields)),
'std_yield': float(np.std(yields)),
'min_yield': float(np.min(yields)),
'max_yield': float(np.max(yields)),
'cv_yield': float(np.std(yields) / np.mean(yields)), # Coefficient of variation
'percentile_25': float(np.percentile(yields, 25)),
'percentile_75': float(np.percentile(yields, 75)),
'runs_below_90': int(np.sum(yields < 90)),
'runs_above_95': int(np.sum(yields > 95))
}
def _engineer_features(self, production_history: pd.DataFrame) -> pd.DataFrame:
"""Engineer features from production history."""
df = production_history.copy()
# Time-based features
df['started_at'] = pd.to_datetime(df['started_at'])
df['hour_of_day'] = df['started_at'].dt.hour
df['day_of_week'] = df['started_at'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_early_morning'] = (df['hour_of_day'] < 6).astype(int)
df['is_late_night'] = (df['hour_of_day'] >= 22).astype(int)
# Duration features
if 'completed_at' in df.columns:
df['completed_at'] = pd.to_datetime(df['completed_at'])
df['duration_hours'] = (df['completed_at'] - df['started_at']).dt.total_seconds() / 3600
df['is_rushed'] = (df['duration_hours'] < df['duration_hours'].quantile(0.25)).astype(int)
# Batch size features
df['batch_size_normalized'] = df['batch_size'] / df['batch_size'].mean()
df['is_large_batch'] = (df['batch_size'] > df['batch_size'].quantile(0.75)).astype(int)
df['is_small_batch'] = (df['batch_size'] < df['batch_size'].quantile(0.25)).astype(int)
# Worker experience features (proxy: number of previous runs)
# Extract first worker from staff_assigned list
df['worker_id'] = df['staff_assigned'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else 'unknown')
df = df.sort_values('started_at')
df['worker_run_count'] = df.groupby('worker_id').cumcount() + 1
df['worker_experience_level'] = pd.cut(
df['worker_run_count'],
bins=[0, 5, 15, 100],
labels=['novice', 'intermediate', 'expert']
)
# Recent yield trend for worker
df['worker_recent_avg_yield'] = df.groupby('worker_id')['yield_percentage'].transform(
lambda x: x.rolling(window=5, min_periods=1).mean()
)
return df
def _analyze_yield_factors(self, feature_df: pd.DataFrame) -> Dict[str, Any]:
"""Analyze factors affecting yield using statistical tests."""
factors = {}
# Worker impact
# Extract worker_id from staff_assigned for analysis
if 'worker_id' not in feature_df.columns:
feature_df['worker_id'] = feature_df['staff_assigned'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else 'unknown')
worker_yields = feature_df.groupby('worker_id')['yield_percentage'].agg(['mean', 'std', 'count'])
worker_yields = worker_yields[worker_yields['count'] >= 3] # Min 3 runs per worker
if len(worker_yields) > 1:
# ANOVA test: Does worker significantly affect yield?
worker_groups = [
feature_df[feature_df['worker_id'] == worker]['yield_percentage'].values
for worker in worker_yields.index
]
f_stat, p_value = stats.f_oneway(*worker_groups)
factors['worker'] = {
'significant': p_value < 0.05,
'p_value': float(p_value),
'f_statistic': float(f_stat),
'best_worker': worker_yields['mean'].idxmax(),
'best_worker_yield': float(worker_yields['mean'].max()),
'worst_worker': worker_yields['mean'].idxmin(),
'worst_worker_yield': float(worker_yields['mean'].min()),
'yield_range': float(worker_yields['mean'].max() - worker_yields['mean'].min())
}
else:
factors['worker'] = {'significant': False, 'reason': 'insufficient_workers'}
# Time of day impact
time_groups = {
'early_morning': feature_df[feature_df['hour_of_day'] < 6]['yield_percentage'].values,
'morning': feature_df[(feature_df['hour_of_day'] >= 6) & (feature_df['hour_of_day'] < 12)]['yield_percentage'].values,
'afternoon': feature_df[(feature_df['hour_of_day'] >= 12) & (feature_df['hour_of_day'] < 18)]['yield_percentage'].values,
'evening': feature_df[feature_df['hour_of_day'] >= 18]['yield_percentage'].values
}
time_groups = {k: v for k, v in time_groups.items() if len(v) >= 3}
if len(time_groups) > 1:
f_stat, p_value = stats.f_oneway(*time_groups.values())
time_means = {k: np.mean(v) for k, v in time_groups.items()}
factors['time_of_day'] = {
'significant': p_value < 0.05,
'p_value': float(p_value),
'best_time': max(time_means, key=time_means.get),
'best_time_yield': float(max(time_means.values())),
'worst_time': min(time_means, key=time_means.get),
'worst_time_yield': float(min(time_means.values())),
'yield_range': float(max(time_means.values()) - min(time_means.values()))
}
else:
factors['time_of_day'] = {'significant': False, 'reason': 'insufficient_data'}
# Batch size impact (correlation)
if len(feature_df) >= 10:
correlation, p_value = stats.pearsonr(
feature_df['batch_size'],
feature_df['yield_percentage']
)
factors['batch_size'] = {
'significant': abs(correlation) > 0.3 and p_value < 0.05,
'correlation': float(correlation),
'p_value': float(p_value),
'direction': 'positive' if correlation > 0 else 'negative',
'interpretation': self._interpret_batch_size_effect(correlation)
}
else:
factors['batch_size'] = {'significant': False, 'reason': 'insufficient_data'}
# Weekend vs weekday
weekend_yields = feature_df[feature_df['is_weekend'] == 1]['yield_percentage'].values
weekday_yields = feature_df[feature_df['is_weekend'] == 0]['yield_percentage'].values
if len(weekend_yields) >= 3 and len(weekday_yields) >= 3:
t_stat, p_value = stats.ttest_ind(weekend_yields, weekday_yields)
factors['weekend_effect'] = {
'significant': p_value < 0.05,
'p_value': float(p_value),
't_statistic': float(t_stat),
'weekend_yield': float(np.mean(weekend_yields)),
'weekday_yield': float(np.mean(weekday_yields)),
'difference': float(np.mean(weekend_yields) - np.mean(weekday_yields))
}
else:
factors['weekend_effect'] = {'significant': False, 'reason': 'insufficient_weekend_data'}
return factors
def _interpret_batch_size_effect(self, correlation: float) -> str:
"""Interpret batch size correlation."""
if abs(correlation) < 0.3:
return "Batch size has minimal impact on yield"
elif correlation > 0:
return "Larger batches tend to have higher yield (economies of scale)"
else:
return "Larger batches tend to have lower yield (difficulty handling large volumes)"
def _train_yield_model(self, feature_df: pd.DataFrame) -> Dict[str, Any]:
"""Train ML model to predict yield."""
# Prepare features
feature_columns = [
'hour_of_day', 'day_of_week', 'is_weekend',
'batch_size_normalized', 'is_large_batch', 'is_small_batch',
'worker_run_count'
]
if 'duration_hours' in feature_df.columns:
feature_columns.append('duration_hours')
# Encode worker_id (extracted from staff_assigned)
if 'worker_id' not in feature_df.columns:
feature_df['worker_id'] = feature_df['staff_assigned'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else 'unknown')
worker_encoding = {worker: idx for idx, worker in enumerate(feature_df['worker_id'].unique())}
feature_df['worker_encoded'] = feature_df['worker_id'].map(worker_encoding)
feature_columns.append('worker_encoded')
X = feature_df[feature_columns].fillna(0).values
y = feature_df['yield_percentage'].values
# Split into train/test (temporal split)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Train ensemble of models
models = {
'random_forest': RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42),
'gradient_boosting': GradientBoostingRegressor(n_estimators=50, max_depth=3, random_state=42),
'linear': LinearRegression()
}
performances = {}
predictions = {}
for name, model in models.items():
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
mae = np.mean(np.abs(y_test - y_pred))
rmse = np.sqrt(np.mean((y_test - y_pred) ** 2))
r2 = 1 - (np.sum((y_test - y_pred) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2))
performances[name] = {
'mae': float(mae),
'rmse': float(rmse),
'r2': float(r2)
}
predictions[name] = y_pred
# Select best model based on MAE
best_model_name = min(performances, key=lambda k: performances[k]['mae'])
best_model = models[best_model_name]
# Feature importance (if available)
feature_importance = {}
if hasattr(best_model, 'feature_importances_'):
importances = best_model.feature_importances_
feature_importance = {
feature_columns[i]: float(importances[i])
for i in range(len(feature_columns))
}
feature_importance = dict(sorted(
feature_importance.items(),
key=lambda x: x[1],
reverse=True
))
return {
'best_model': best_model,
'best_model_name': best_model_name,
'scaler': scaler,
'feature_columns': feature_columns,
'worker_encoding': worker_encoding,
'performance': performances[best_model_name],
'all_performances': performances,
'feature_importance': feature_importance
}
def _predict_upcoming_run(
self,
production_context: Dict[str, Any],
model_results: Dict[str, Any],
baseline_stats: Dict[str, Any],
feature_df: pd.DataFrame
) -> Dict[str, Any]:
"""Predict yield for upcoming production run."""
# Extract context
staff_assigned = production_context.get('staff_assigned', [])
worker_id = staff_assigned[0] if isinstance(staff_assigned, list) and len(staff_assigned) > 0 else 'unknown'
planned_start = pd.to_datetime(production_context.get('planned_start_time'))
batch_size = production_context.get('batch_size')
# Get worker experience
if 'worker_id' not in feature_df.columns:
feature_df['worker_id'] = feature_df['staff_assigned'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else 'unknown')
worker_runs = feature_df[feature_df['worker_id'] == worker_id]
worker_run_count = len(worker_runs) if len(worker_runs) > 0 else 1
# Build feature vector
mean_batch_size = feature_df['batch_size'].mean()
batch_size_normalized = batch_size / mean_batch_size
is_large_batch = 1 if batch_size > feature_df['batch_size'].quantile(0.75) else 0
is_small_batch = 1 if batch_size < feature_df['batch_size'].quantile(0.25) else 0
features = {
'hour_of_day': planned_start.hour,
'day_of_week': planned_start.dayofweek,
'is_weekend': 1 if planned_start.dayofweek in [5, 6] else 0,
'batch_size_normalized': batch_size_normalized,
'is_large_batch': is_large_batch,
'is_small_batch': is_small_batch,
'worker_run_count': worker_run_count,
'duration_hours': 0, # Not known yet
'worker_encoded': model_results['worker_encoding'].get(worker_id, 0)
}
# Create feature vector in correct order
X = np.array([[features.get(col, 0) for col in model_results['feature_columns']]])
X_scaled = model_results['scaler'].transform(X)
# Predict
predicted_yield = float(model_results['best_model'].predict(X_scaled)[0])
# Prediction range (based on model RMSE)
rmse = model_results['performance']['rmse']
prediction_range = {
'lower': max(0, predicted_yield - 1.96 * rmse),
'upper': min(100, predicted_yield + 1.96 * rmse)
}
# Expected waste
planned_quantity = production_context.get('planned_quantity', 100)
expected_waste_pct = max(0, 100 - predicted_yield)
expected_waste_units = planned_quantity * (expected_waste_pct / 100)
return {
'predicted_yield': round(predicted_yield, 2),
'prediction_range': prediction_range,
'expected_waste_pct': round(expected_waste_pct, 2),
'expected_waste_units': round(expected_waste_units, 2),
'baseline_comparison': round(predicted_yield - baseline_stats['mean_yield'], 2),
'features_used': features
}
def _identify_yield_patterns(
self, feature_df: pd.DataFrame, factor_analysis: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Identify recurring low-yield patterns."""
patterns = []
# Pattern 1: Specific worker consistently low
if factor_analysis.get('worker', {}).get('significant'):
worst_worker = factor_analysis['worker']['worst_worker']
worst_yield = factor_analysis['worker']['worst_worker_yield']
best_yield = factor_analysis['worker']['best_worker_yield']
if worst_yield < 90 and (best_yield - worst_yield) > 5:
patterns.append({
'pattern': 'low_yield_worker',
'description': f'Worker {worst_worker} consistently produces {worst_yield:.1f}% yield vs best worker {best_yield:.1f}%',
'severity': 'high' if worst_yield < 85 else 'medium',
'affected_runs': int(len(feature_df[feature_df['worker_id'] == worst_worker])),
'yield_impact': round(best_yield - worst_yield, 2),
'recommendation': 'Provide additional training or reassign to different recipes'
})
# Pattern 2: Time-of-day effect
if factor_analysis.get('time_of_day', {}).get('significant'):
worst_time = factor_analysis['time_of_day']['worst_time']
worst_yield = factor_analysis['time_of_day']['worst_time_yield']
if worst_yield < 90:
patterns.append({
'pattern': 'low_yield_time',
'description': f'{worst_time} shifts produce {worst_yield:.1f}% yield',
'severity': 'medium',
'affected_runs': 'varies',
'yield_impact': round(factor_analysis['time_of_day']['yield_range'], 2),
'recommendation': f'Avoid scheduling this recipe during {worst_time}'
})
# Pattern 3: Large batch issues
if factor_analysis.get('batch_size', {}).get('significant'):
if factor_analysis['batch_size']['direction'] == 'negative':
patterns.append({
'pattern': 'large_batch_yield_loss',
'description': 'Larger batches have lower yield - equipment or process capacity issues',
'severity': 'medium',
'correlation': round(factor_analysis['batch_size']['correlation'], 3),
'recommendation': 'Split large batches or upgrade equipment'
})
# Pattern 4: Weekend effect
if factor_analysis.get('weekend_effect', {}).get('significant'):
weekend_yield = factor_analysis['weekend_effect']['weekend_yield']
weekday_yield = factor_analysis['weekend_effect']['weekday_yield']
if abs(weekend_yield - weekday_yield) > 3:
if weekend_yield < weekday_yield:
patterns.append({
'pattern': 'weekend_yield_drop',
'description': f'Weekend production {weekend_yield:.1f}% vs weekday {weekday_yield:.1f}%',
'severity': 'low',
'yield_impact': round(weekday_yield - weekend_yield, 2),
'recommendation': 'Review weekend staffing or processes'
})
return patterns
def _generate_yield_insights(
self,
tenant_id: str,
recipe_id: str,
baseline_stats: Dict[str, Any],
factor_analysis: Dict[str, Any],
patterns: List[Dict[str, Any]],
prediction: Dict[str, Any],
production_context: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Generate actionable insights for yield improvement."""
insights = []
# Insight 1: Low predicted yield warning
if prediction['predicted_yield'] < 90:
waste_value = prediction['expected_waste_units'] * production_context.get('unit_cost', 5)
insights.append({
'type': 'warning',
'priority': 'high' if prediction['predicted_yield'] < 85 else 'medium',
'category': 'production',
'title': f'Low Yield Predicted: {prediction["predicted_yield"]:.1f}%',
'description': f'Upcoming production run predicted to yield {prediction["predicted_yield"]:.1f}%, below baseline {baseline_stats["mean_yield"]:.1f}%. Expected waste: {prediction["expected_waste_units"]:.1f} units (€{waste_value:.2f}).',
'impact_type': 'waste',
'impact_value': prediction['expected_waste_units'],
'impact_unit': 'units',
'confidence': 75,
'metrics_json': {
'recipe_id': recipe_id,
'predicted_yield': prediction['predicted_yield'],
'expected_waste': prediction['expected_waste_units'],
'waste_value': round(waste_value, 2)
},
'actionable': True,
'recommendation_actions': [{
'label': 'Review Production Setup',
'action': 'review_production_factors',
'params': {
'recipe_id': recipe_id,
'worker_id': worker_id
}
}]
})
# Insight 2: High-severity patterns
for pattern in patterns:
if pattern.get('severity') == 'high':
if pattern['pattern'] == 'low_yield_worker':
insights.append({
'type': 'opportunity',
'priority': 'high',
'category': 'production',
'title': f'Worker Training Opportunity: {pattern["yield_impact"]:.1f}% Yield Gap',
'description': pattern['description'] + f'. Improving this worker to average performance would save significant waste.',
'impact_type': 'yield_improvement',
'impact_value': pattern['yield_impact'],
'impact_unit': 'percentage_points',
'confidence': 85,
'metrics_json': {
'recipe_id': recipe_id,
'pattern': pattern['pattern'],
'yield_impact': pattern['yield_impact']
},
'actionable': True,
'recommendation_actions': [{
'label': 'Schedule Training',
'action': 'schedule_worker_training',
'params': {'recipe_id': recipe_id}
}]
})
# Insight 3: Excellent yield
if prediction['predicted_yield'] > 98:
insights.append({
'type': 'positive',
'priority': 'low',
'category': 'production',
'title': f'Excellent Yield Expected: {prediction["predicted_yield"]:.1f}%',
'description': f'Optimal production conditions detected. Expected yield {prediction["predicted_yield"]:.1f}% exceeds baseline {baseline_stats["mean_yield"]:.1f}%.',
'impact_type': 'yield_improvement',
'impact_value': prediction['baseline_comparison'],
'impact_unit': 'percentage_points',
'confidence': 70,
'metrics_json': {
'recipe_id': recipe_id,
'predicted_yield': prediction['predicted_yield']
},
'actionable': False
})
# Insight 4: Yield variability issue
if baseline_stats['cv_yield'] > 0.05: # Coefficient of variation > 5%
insights.append({
'type': 'opportunity',
'priority': 'medium',
'category': 'production',
'title': f'High Yield Variability: {baseline_stats["cv_yield"]*100:.1f}% CV',
'description': f'Yield varies significantly across production runs (CV={baseline_stats["cv_yield"]*100:.1f}%, range {baseline_stats["min_yield"]:.1f}%-{baseline_stats["max_yield"]:.1f}%). Standardizing processes could reduce waste.',
'impact_type': 'process_improvement',
'confidence': 80,
'metrics_json': {
'recipe_id': recipe_id,
'cv_yield': round(baseline_stats['cv_yield'], 3),
'yield_range': round(baseline_stats['max_yield'] - baseline_stats['min_yield'], 2)
},
'actionable': True,
'recommendation_actions': [{
'label': 'Standardize Process',
'action': 'review_production_sop',
'params': {'recipe_id': recipe_id}
}]
})
return insights
def _calculate_prediction_confidence(
self,
production_history: pd.DataFrame,
model_results: Dict[str, Any],
factor_analysis: Dict[str, Any]
) -> int:
"""Calculate overall confidence score for predictions."""
confidence_factors = []
# Factor 1: Sample size (0-30 points)
n_runs = len(production_history)
if n_runs >= 100:
sample_score = 30
elif n_runs >= 50:
sample_score = 25
elif n_runs >= 30:
sample_score = 20
else:
sample_score = 10
confidence_factors.append(('sample_size', sample_score))
# Factor 2: Model performance (0-30 points)
r2 = model_results['performance']['r2']
mae = model_results['performance']['mae']
if r2 > 0.7 and mae < 3:
model_score = 30
elif r2 > 0.5 and mae < 5:
model_score = 25
elif r2 > 0.3 and mae < 7:
model_score = 20
else:
model_score = 10
confidence_factors.append(('model_performance', model_score))
# Factor 3: Statistical significance of factors (0-25 points)
significant_factors = sum(
1 for factor in factor_analysis.values()
if isinstance(factor, dict) and factor.get('significant')
)
if significant_factors >= 3:
stats_score = 25
elif significant_factors >= 2:
stats_score = 20
elif significant_factors >= 1:
stats_score = 15
else:
stats_score = 10
confidence_factors.append(('significant_factors', stats_score))
# Factor 4: Data recency (0-15 points)
most_recent = production_history['started_at'].max()
days_old = (datetime.utcnow() - pd.to_datetime(most_recent)).days
if days_old <= 7:
recency_score = 15
elif days_old <= 30:
recency_score = 12
elif days_old <= 90:
recency_score = 8
else:
recency_score = 5
confidence_factors.append(('data_recency', recency_score))
total_confidence = sum(score for _, score in confidence_factors)
return min(100, max(0, total_confidence))
async def analyze_recipe_yield_history(
self,
tenant_id: str,
recipe_id: str,
production_history: pd.DataFrame,
min_history_runs: int = 30
) -> Dict[str, Any]:
"""
Analyze historical yield performance for a recipe (no prediction).
Args:
tenant_id: Tenant identifier
recipe_id: Recipe identifier
production_history: Historical production runs
min_history_runs: Minimum production runs required
Returns:
Historical analysis with insights
"""
logger.info(
"Analyzing recipe yield history",
tenant_id=tenant_id,
recipe_id=recipe_id,
history_runs=len(production_history)
)
if len(production_history) < min_history_runs:
return self._insufficient_data_response(
recipe_id, {}, len(production_history), min_history_runs
)
# Calculate statistics
baseline_stats = self._calculate_baseline_statistics(production_history)
# Feature engineering
feature_df = self._engineer_features(production_history)
# Analyze factors
factor_analysis = self._analyze_yield_factors(feature_df)
# Identify patterns
patterns = self._identify_yield_patterns(feature_df, factor_analysis)
# Generate insights (without prediction)
insights = []
# Add insights for patterns
for pattern in patterns:
if pattern.get('severity') in ['high', 'medium']:
insights.append({
'type': 'opportunity',
'priority': pattern['severity'],
'category': 'production',
'title': f'Yield Pattern Detected: {pattern["pattern"]}',
'description': pattern['description'],
'impact_type': 'yield_improvement',
'confidence': 80,
'metrics_json': {
'recipe_id': recipe_id,
'pattern': pattern
},
'actionable': True,
'recommendation': pattern['recommendation']
})
return {
'recipe_id': recipe_id,
'analyzed_at': datetime.utcnow().isoformat(),
'history_runs': len(production_history),
'baseline_stats': baseline_stats,
'factor_analysis': factor_analysis,
'patterns': patterns,
'insights': insights
}