Files
bakery-ia/services/ai_insights/app/ml/feedback_learning_system.py

673 lines
25 KiB
Python
Raw Normal View History

2025-11-05 13:34:56 +01:00
"""
Feedback Loop & Learning System
Enables continuous improvement through outcome tracking and model retraining
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from uuid import UUID
import structlog
from scipy import stats
from collections import defaultdict
logger = structlog.get_logger()
class FeedbackLearningSystem:
"""
Manages feedback collection, model performance tracking, and retraining triggers.
Key Responsibilities:
1. Aggregate feedback from applied insights
2. Calculate model performance metrics (accuracy, precision, recall)
3. Detect performance degradation
4. Trigger automatic retraining when needed
5. Calibrate confidence scores based on actual accuracy
6. Generate learning insights for model improvement
Workflow:
- Feedback continuously recorded via AIInsightsClient
- Periodic performance analysis (daily/weekly)
- Automatic alerts when performance degrades
- Retraining recommendations with priority
"""
def __init__(
self,
performance_threshold: float = 0.85, # Minimum acceptable accuracy
degradation_threshold: float = 0.10, # 10% drop triggers alert
min_feedback_samples: int = 30, # Minimum samples for analysis
retraining_window_days: int = 90 # Consider last 90 days
):
self.performance_threshold = performance_threshold
self.degradation_threshold = degradation_threshold
self.min_feedback_samples = min_feedback_samples
self.retraining_window_days = retraining_window_days
async def analyze_model_performance(
self,
model_name: str,
feedback_data: pd.DataFrame,
baseline_performance: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Analyze model performance based on feedback data.
Args:
model_name: Name of the model (e.g., 'hybrid_forecaster', 'yield_predictor')
feedback_data: DataFrame with columns:
- insight_id
- applied_at
- outcome_date
- predicted_value
- actual_value
- error
- error_pct
- accuracy
baseline_performance: Optional baseline metrics for comparison
Returns:
Performance analysis with metrics, trends, and recommendations
"""
logger.info(
"Analyzing model performance",
model_name=model_name,
feedback_samples=len(feedback_data)
)
if len(feedback_data) < self.min_feedback_samples:
return self._insufficient_feedback_response(
model_name, len(feedback_data), self.min_feedback_samples
)
# Step 1: Calculate current performance metrics
current_metrics = self._calculate_performance_metrics(feedback_data)
# Step 2: Analyze performance trend over time
trend_analysis = self._analyze_performance_trend(feedback_data)
# Step 3: Detect performance degradation
degradation_detected = self._detect_performance_degradation(
current_metrics, baseline_performance, trend_analysis
)
# Step 4: Generate retraining recommendation
retraining_recommendation = self._generate_retraining_recommendation(
model_name, current_metrics, degradation_detected, trend_analysis
)
# Step 5: Identify error patterns
error_patterns = self._identify_error_patterns(feedback_data)
# Step 6: Calculate confidence calibration
confidence_calibration = self._calculate_confidence_calibration(feedback_data)
logger.info(
"Model performance analysis complete",
model_name=model_name,
current_accuracy=current_metrics['accuracy'],
degradation_detected=degradation_detected['detected'],
retraining_recommended=retraining_recommendation['recommended']
)
return {
'model_name': model_name,
'analyzed_at': datetime.utcnow().isoformat(),
'feedback_samples': len(feedback_data),
'date_range': {
'start': feedback_data['outcome_date'].min().isoformat(),
'end': feedback_data['outcome_date'].max().isoformat()
},
'current_performance': current_metrics,
'baseline_performance': baseline_performance,
'trend_analysis': trend_analysis,
'degradation_detected': degradation_detected,
'retraining_recommendation': retraining_recommendation,
'error_patterns': error_patterns,
'confidence_calibration': confidence_calibration
}
def _insufficient_feedback_response(
self, model_name: str, current_samples: int, required_samples: int
) -> Dict[str, Any]:
"""Return response when insufficient feedback data."""
return {
'model_name': model_name,
'analyzed_at': datetime.utcnow().isoformat(),
'status': 'insufficient_feedback',
'feedback_samples': current_samples,
'required_samples': required_samples,
'current_performance': None,
'recommendation': f'Need {required_samples - current_samples} more feedback samples for reliable analysis'
}
def _calculate_performance_metrics(
self, feedback_data: pd.DataFrame
) -> Dict[str, float]:
"""
Calculate comprehensive performance metrics.
Metrics:
- Accuracy: % of predictions within acceptable error
- MAE: Mean Absolute Error
- RMSE: Root Mean Squared Error
- MAPE: Mean Absolute Percentage Error
- Bias: Systematic over/under prediction
- : Correlation between predicted and actual
"""
predicted = feedback_data['predicted_value'].values
actual = feedback_data['actual_value'].values
# Filter out invalid values
valid_mask = ~(np.isnan(predicted) | np.isnan(actual))
predicted = predicted[valid_mask]
actual = actual[valid_mask]
if len(predicted) == 0:
return {
'accuracy': 0,
'mae': 0,
'rmse': 0,
'mape': 0,
'bias': 0,
'r_squared': 0
}
# Calculate errors
errors = predicted - actual
abs_errors = np.abs(errors)
pct_errors = np.abs(errors / actual) * 100 if np.all(actual != 0) else np.zeros_like(errors)
# MAE and RMSE
mae = float(np.mean(abs_errors))
rmse = float(np.sqrt(np.mean(errors ** 2)))
# MAPE (excluding cases where actual = 0)
valid_pct_mask = actual != 0
mape = float(np.mean(pct_errors[valid_pct_mask])) if np.any(valid_pct_mask) else 0
# Accuracy (% within 10% error)
within_10pct = np.sum(pct_errors <= 10) / len(pct_errors) * 100
# Bias (mean error - positive = over-prediction)
bias = float(np.mean(errors))
# R² (correlation)
if len(predicted) > 1 and np.std(actual) > 0:
correlation = np.corrcoef(predicted, actual)[0, 1]
r_squared = correlation ** 2
else:
r_squared = 0
return {
'accuracy': round(within_10pct, 2), # % within 10% error
'mae': round(mae, 2),
'rmse': round(rmse, 2),
'mape': round(mape, 2),
'bias': round(bias, 2),
'r_squared': round(r_squared, 3),
'sample_size': len(predicted)
}
def _analyze_performance_trend(
self, feedback_data: pd.DataFrame
) -> Dict[str, Any]:
"""
Analyze performance trend over time.
Returns trend direction (improving/stable/degrading) and slope.
"""
# Sort by date
df = feedback_data.sort_values('outcome_date').copy()
# Calculate rolling accuracy (7-day window)
df['rolling_accuracy'] = df['accuracy'].rolling(window=7, min_periods=3).mean()
# Linear trend
if len(df) >= 10:
# Use day index as x
df['day_index'] = (df['outcome_date'] - df['outcome_date'].min()).dt.days
# Fit linear regression
valid_mask = ~np.isnan(df['rolling_accuracy'])
if valid_mask.sum() >= 10:
x = df.loc[valid_mask, 'day_index'].values
y = df.loc[valid_mask, 'rolling_accuracy'].values
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# Determine trend
if p_value < 0.05:
if slope > 0.1:
trend = 'improving'
elif slope < -0.1:
trend = 'degrading'
else:
trend = 'stable'
else:
trend = 'stable'
return {
'trend': trend,
'slope': round(float(slope), 4),
'p_value': round(float(p_value), 4),
'significant': p_value < 0.05,
'recent_performance': round(float(df['rolling_accuracy'].iloc[-1]), 2),
'initial_performance': round(float(df['rolling_accuracy'].dropna().iloc[0]), 2)
}
# Not enough data for trend
return {
'trend': 'insufficient_data',
'slope': 0,
'p_value': 1.0,
'significant': False
}
def _detect_performance_degradation(
self,
current_metrics: Dict[str, float],
baseline_performance: Optional[Dict[str, float]],
trend_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""
Detect if model performance has degraded.
Degradation triggers:
1. Current accuracy below threshold (85%)
2. Significant drop from baseline (>10%)
3. Degrading trend detected
"""
degradation_reasons = []
severity = 'none'
# Check absolute performance
if current_metrics['accuracy'] < self.performance_threshold * 100:
degradation_reasons.append(
f"Accuracy {current_metrics['accuracy']:.1f}% below threshold {self.performance_threshold*100}%"
)
severity = 'high'
# Check vs baseline
if baseline_performance and 'accuracy' in baseline_performance:
baseline_acc = baseline_performance['accuracy']
current_acc = current_metrics['accuracy']
drop_pct = (baseline_acc - current_acc) / baseline_acc
if drop_pct > self.degradation_threshold:
degradation_reasons.append(
f"Accuracy dropped {drop_pct*100:.1f}% from baseline {baseline_acc:.1f}%"
)
severity = 'high' if severity != 'high' else severity
# Check trend
if trend_analysis.get('trend') == 'degrading' and trend_analysis.get('significant'):
degradation_reasons.append(
f"Degrading trend detected (slope: {trend_analysis['slope']:.4f})"
)
severity = 'medium' if severity == 'none' else severity
detected = len(degradation_reasons) > 0
return {
'detected': detected,
'severity': severity,
'reasons': degradation_reasons,
'current_accuracy': current_metrics['accuracy'],
'baseline_accuracy': baseline_performance.get('accuracy') if baseline_performance else None
}
def _generate_retraining_recommendation(
self,
model_name: str,
current_metrics: Dict[str, float],
degradation_detected: Dict[str, Any],
trend_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""
Generate retraining recommendation based on performance analysis.
Priority Levels:
- urgent: Severe degradation, retrain immediately
- high: Performance below threshold, retrain soon
- medium: Trending down, schedule retraining
- low: Stable, routine retraining
- none: No retraining needed
"""
if degradation_detected['detected']:
severity = degradation_detected['severity']
if severity == 'high':
priority = 'urgent'
recommendation = f"Retrain {model_name} immediately - severe performance degradation"
elif severity == 'medium':
priority = 'high'
recommendation = f"Schedule {model_name} retraining within 7 days"
else:
priority = 'medium'
recommendation = f"Schedule routine {model_name} retraining"
return {
'recommended': True,
'priority': priority,
'recommendation': recommendation,
'reasons': degradation_detected['reasons'],
'estimated_improvement': self._estimate_retraining_benefit(
current_metrics, degradation_detected
)
}
# Check if routine retraining is due (e.g., every 90 days)
# This would require tracking last_retrained_at
else:
return {
'recommended': False,
'priority': 'none',
'recommendation': f"{model_name} performance is acceptable, no immediate retraining needed",
'next_review_date': (datetime.utcnow() + timedelta(days=30)).isoformat()
}
def _estimate_retraining_benefit(
self,
current_metrics: Dict[str, float],
degradation_detected: Dict[str, Any]
) -> Dict[str, Any]:
"""Estimate expected improvement from retraining."""
baseline_acc = degradation_detected.get('baseline_accuracy')
current_acc = current_metrics['accuracy']
if baseline_acc:
# Expect to recover 70-80% of lost performance
expected_improvement = (baseline_acc - current_acc) * 0.75
expected_new_acc = current_acc + expected_improvement
return {
'expected_accuracy_improvement': round(expected_improvement, 2),
'expected_new_accuracy': round(expected_new_acc, 2),
'confidence': 'medium'
}
return {
'expected_accuracy_improvement': 'unknown',
'confidence': 'low'
}
def _identify_error_patterns(
self, feedback_data: pd.DataFrame
) -> List[Dict[str, Any]]:
"""
Identify systematic error patterns.
Patterns:
- Consistent over/under prediction
- Higher errors for specific ranges
- Day-of-week effects
- Seasonal effects
"""
patterns = []
# Pattern 1: Systematic bias
mean_error = feedback_data['error'].mean()
if abs(mean_error) > feedback_data['error'].std() * 0.5:
direction = 'over-prediction' if mean_error > 0 else 'under-prediction'
patterns.append({
'pattern': 'systematic_bias',
'description': f'Consistent {direction} by {abs(mean_error):.1f} units',
'severity': 'high' if abs(mean_error) > 10 else 'medium',
'recommendation': 'Recalibrate model bias term'
})
# Pattern 2: High error for large values
if 'predicted_value' in feedback_data.columns:
# Split into quartiles
feedback_data['value_quartile'] = pd.qcut(
feedback_data['predicted_value'],
q=4,
labels=['Q1', 'Q2', 'Q3', 'Q4'],
duplicates='drop'
)
quartile_errors = feedback_data.groupby('value_quartile')['error_pct'].mean()
if len(quartile_errors) == 4 and quartile_errors['Q4'] > quartile_errors['Q1'] * 1.5:
patterns.append({
'pattern': 'high_value_error',
'description': f'Higher errors for large predictions (Q4: {quartile_errors["Q4"]:.1f}% vs Q1: {quartile_errors["Q1"]:.1f}%)',
'severity': 'medium',
'recommendation': 'Add log transformation or separate model for high values'
})
# Pattern 3: Day-of-week effect
if 'outcome_date' in feedback_data.columns:
feedback_data['day_of_week'] = pd.to_datetime(feedback_data['outcome_date']).dt.dayofweek
dow_errors = feedback_data.groupby('day_of_week')['error_pct'].mean()
if len(dow_errors) >= 5 and dow_errors.max() > dow_errors.min() * 1.5:
worst_day = dow_errors.idxmax()
day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
patterns.append({
'pattern': 'day_of_week_effect',
'description': f'Higher errors on {day_names[worst_day]} ({dow_errors[worst_day]:.1f}%)',
'severity': 'low',
'recommendation': 'Add day-of-week features to model'
})
return patterns
def _calculate_confidence_calibration(
self, feedback_data: pd.DataFrame
) -> Dict[str, Any]:
"""
Calculate how well confidence scores match actual accuracy.
Well-calibrated model: 80% confidence 80% accuracy
"""
if 'confidence' not in feedback_data.columns:
return {'calibrated': False, 'reason': 'No confidence scores available'}
# Bin by confidence ranges
feedback_data['confidence_bin'] = pd.cut(
feedback_data['confidence'],
bins=[0, 60, 70, 80, 90, 100],
labels=['<60', '60-70', '70-80', '80-90', '90+']
)
calibration_results = []
for conf_bin in feedback_data['confidence_bin'].unique():
if pd.isna(conf_bin):
continue
bin_data = feedback_data[feedback_data['confidence_bin'] == conf_bin]
if len(bin_data) >= 5:
avg_confidence = bin_data['confidence'].mean()
avg_accuracy = bin_data['accuracy'].mean()
calibration_error = abs(avg_confidence - avg_accuracy)
calibration_results.append({
'confidence_range': str(conf_bin),
'avg_confidence': round(avg_confidence, 1),
'avg_accuracy': round(avg_accuracy, 1),
'calibration_error': round(calibration_error, 1),
'sample_size': len(bin_data),
'well_calibrated': calibration_error < 10
})
# Overall calibration
if calibration_results:
overall_calibration_error = np.mean([r['calibration_error'] for r in calibration_results])
well_calibrated = overall_calibration_error < 10
return {
'calibrated': well_calibrated,
'overall_calibration_error': round(overall_calibration_error, 2),
'by_confidence_range': calibration_results,
'recommendation': 'Confidence scores are well-calibrated' if well_calibrated
else 'Recalibrate confidence scoring algorithm'
}
return {'calibrated': False, 'reason': 'Insufficient data for calibration analysis'}
async def generate_learning_insights(
self,
performance_analyses: List[Dict[str, Any]],
tenant_id: str
) -> List[Dict[str, Any]]:
"""
Generate high-level insights about learning system performance.
Args:
performance_analyses: List of model performance analyses
tenant_id: Tenant identifier
Returns:
Learning insights for system improvement
"""
insights = []
# Insight 1: Models needing urgent retraining
urgent_models = [
a for a in performance_analyses
if a.get('retraining_recommendation', {}).get('priority') == 'urgent'
]
if urgent_models:
model_names = ', '.join([a['model_name'] for a in urgent_models])
insights.append({
'type': 'warning',
'priority': 'urgent',
'category': 'system',
'title': f'Urgent Model Retraining Required: {len(urgent_models)} Models',
'description': f'Models requiring immediate retraining: {model_names}. Performance has degraded significantly.',
'impact_type': 'system_health',
'confidence': 95,
'metrics_json': {
'tenant_id': tenant_id,
'urgent_models': [a['model_name'] for a in urgent_models],
'affected_count': len(urgent_models)
},
'actionable': True,
'recommendation_actions': [{
'label': 'Retrain Models',
'action': 'trigger_model_retraining',
'params': {'models': [a['model_name'] for a in urgent_models]}
}],
'source_service': 'ai_insights',
'source_model': 'feedback_learning_system'
})
# Insight 2: Overall system health
total_models = len(performance_analyses)
healthy_models = [
a for a in performance_analyses
if not a.get('degradation_detected', {}).get('detected', False)
]
health_pct = (len(healthy_models) / total_models * 100) if total_models > 0 else 0
if health_pct < 80:
insights.append({
'type': 'warning',
'priority': 'high',
'category': 'system',
'title': f'Learning System Health: {health_pct:.0f}%',
'description': f'{len(healthy_models)} of {total_models} models are performing well. System-wide performance review recommended.',
'impact_type': 'system_health',
'confidence': 90,
'metrics_json': {
'tenant_id': tenant_id,
'total_models': total_models,
'healthy_models': len(healthy_models),
'health_percentage': round(health_pct, 1)
},
'actionable': True,
'recommendation_actions': [{
'label': 'Review System Health',
'action': 'review_learning_system',
'params': {'tenant_id': tenant_id}
}],
'source_service': 'ai_insights',
'source_model': 'feedback_learning_system'
})
# Insight 3: Confidence calibration issues
poorly_calibrated = [
a for a in performance_analyses
if not a.get('confidence_calibration', {}).get('calibrated', True)
]
if poorly_calibrated:
insights.append({
'type': 'opportunity',
'priority': 'medium',
'category': 'system',
'title': f'Confidence Calibration Needed: {len(poorly_calibrated)} Models',
'description': 'Confidence scores do not match actual accuracy. Recalibration recommended.',
'impact_type': 'system_improvement',
'confidence': 85,
'metrics_json': {
'tenant_id': tenant_id,
'models_needing_calibration': [a['model_name'] for a in poorly_calibrated]
},
'actionable': True,
'recommendation_actions': [{
'label': 'Recalibrate Confidence Scores',
'action': 'recalibrate_confidence',
'params': {'models': [a['model_name'] for a in poorly_calibrated]}
}],
'source_service': 'ai_insights',
'source_model': 'feedback_learning_system'
})
return insights
async def calculate_roi(
self,
feedback_data: pd.DataFrame,
insight_type: str
) -> Dict[str, Any]:
"""
Calculate ROI for applied insights.
Args:
feedback_data: Feedback data with business impact metrics
insight_type: Type of insight (e.g., 'demand_forecast', 'safety_stock')
Returns:
ROI calculation with cost savings and accuracy metrics
"""
if len(feedback_data) == 0:
return {'status': 'insufficient_data', 'samples': 0}
# Calculate accuracy
avg_accuracy = feedback_data['accuracy'].mean()
# Estimate cost savings (would be more sophisticated in production)
# For now, use impact_value from insights if available
if 'impact_value' in feedback_data.columns:
total_impact = feedback_data['impact_value'].sum()
avg_impact = feedback_data['impact_value'].mean()
return {
'insight_type': insight_type,
'samples': len(feedback_data),
'avg_accuracy': round(avg_accuracy, 2),
'total_impact_value': round(total_impact, 2),
'avg_impact_per_insight': round(avg_impact, 2),
'roi_validated': True
}
return {
'insight_type': insight_type,
'samples': len(feedback_data),
'avg_accuracy': round(avg_accuracy, 2),
'roi_validated': False,
'note': 'Impact values not tracked in feedback'
}