""" Dynamic Business Rules Engine Learns optimal adjustment factors from historical data instead of using hardcoded values Replaces hardcoded weather multipliers, holiday adjustments, event impacts with learned values """ import pandas as pd import numpy as np from typing import Dict, List, Any, Optional, Tuple import structlog from datetime import datetime, timedelta from scipy import stats from sklearn.linear_model import Ridge from collections import defaultdict logger = structlog.get_logger() class DynamicRulesEngine: """ Learns business rules from historical data instead of using hardcoded values. Current hardcoded values to replace: - Weather: rain = -15%, snow = -25%, extreme_heat = -10% - Holidays: +50% (all holidays treated the same) - Events: +30% (all events treated the same) - Weekend: Manual assumptions Dynamic approach: - Learn actual weather impact per weather condition per product - Learn holiday multipliers per holiday type - Learn event impact by event type - Learn day-of-week patterns per product - Generate insights when learned values differ from hardcoded assumptions """ def __init__(self): self.weather_rules = {} self.holiday_rules = {} self.event_rules = {} self.dow_rules = {} self.month_rules = {} async def learn_all_rules( self, tenant_id: str, inventory_product_id: str, sales_data: pd.DataFrame, external_data: Optional[pd.DataFrame] = None, min_samples: int = 10 ) -> Dict[str, Any]: """ Learn all business rules from historical data. Args: tenant_id: Tenant identifier inventory_product_id: Product identifier sales_data: Historical sales data with 'date', 'quantity' columns external_data: Optional weather/events/holidays data min_samples: Minimum samples required to learn a rule Returns: Dictionary of learned rules and insights """ logger.info( "Learning dynamic business rules from historical data", tenant_id=tenant_id, inventory_product_id=inventory_product_id, data_points=len(sales_data) ) results = { 'tenant_id': tenant_id, 'inventory_product_id': inventory_product_id, 'learned_at': datetime.utcnow().isoformat(), 'rules': {}, 'insights': [] } # Ensure date column is datetime if 'date' not in sales_data.columns: sales_data = sales_data.copy() sales_data['date'] = sales_data['ds'] sales_data['date'] = pd.to_datetime(sales_data['date']) # Learn weather impact rules if external_data is not None and 'weather_condition' in external_data.columns: weather_rules, weather_insights = await self._learn_weather_rules( sales_data, external_data, min_samples ) results['rules']['weather'] = weather_rules results['insights'].extend(weather_insights) self.weather_rules[inventory_product_id] = weather_rules # Learn holiday rules if external_data is not None and 'is_holiday' in external_data.columns: holiday_rules, holiday_insights = await self._learn_holiday_rules( sales_data, external_data, min_samples ) results['rules']['holidays'] = holiday_rules results['insights'].extend(holiday_insights) self.holiday_rules[inventory_product_id] = holiday_rules # Learn event rules if external_data is not None and 'event_type' in external_data.columns: event_rules, event_insights = await self._learn_event_rules( sales_data, external_data, min_samples ) results['rules']['events'] = event_rules results['insights'].extend(event_insights) self.event_rules[inventory_product_id] = event_rules # Learn day-of-week patterns (always available) dow_rules, dow_insights = await self._learn_day_of_week_rules( sales_data, min_samples ) results['rules']['day_of_week'] = dow_rules results['insights'].extend(dow_insights) self.dow_rules[inventory_product_id] = dow_rules # Learn monthly seasonality month_rules, month_insights = await self._learn_month_rules( sales_data, min_samples ) results['rules']['months'] = month_rules results['insights'].extend(month_insights) self.month_rules[inventory_product_id] = month_rules logger.info( "Dynamic rules learning complete", total_insights=len(results['insights']), rules_learned=len(results['rules']) ) return results async def _learn_weather_rules( self, sales_data: pd.DataFrame, external_data: pd.DataFrame, min_samples: int ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Learn actual weather impact from historical data. Hardcoded assumptions: - rain: -15% - snow: -25% - extreme_heat: -10% Learn actual impact for this product. """ logger.info("Learning weather impact rules") # Merge sales with weather data merged = sales_data.merge( external_data[['date', 'weather_condition', 'temperature', 'precipitation']], on='date', how='left' ) # Baseline: average sales on clear days clear_days = merged[ (merged['weather_condition'].isin(['clear', 'sunny', 'partly_cloudy'])) | (merged['weather_condition'].isna()) ] baseline_avg = clear_days['quantity'].mean() weather_rules = { 'baseline_avg': float(baseline_avg), 'conditions': {} } insights = [] # Hardcoded values for comparison hardcoded_impacts = { 'rain': -0.15, 'snow': -0.25, 'extreme_heat': -0.10 } # Learn impact for each weather condition for condition in ['rain', 'rainy', 'snow', 'snowy', 'extreme_heat', 'hot', 'storm', 'fog']: condition_days = merged[merged['weather_condition'].str.contains(condition, case=False, na=False)] if len(condition_days) >= min_samples: condition_avg = condition_days['quantity'].mean() learned_impact = (condition_avg - baseline_avg) / baseline_avg # Statistical significance test t_stat, p_value = stats.ttest_ind( condition_days['quantity'].values, clear_days['quantity'].values, equal_var=False ) weather_rules['conditions'][condition] = { 'learned_multiplier': float(1 + learned_impact), 'learned_impact_pct': float(learned_impact * 100), 'sample_size': int(len(condition_days)), 'avg_quantity': float(condition_avg), 'p_value': float(p_value), 'significant': bool(p_value < 0.05) } # Compare with hardcoded value if exists if condition in hardcoded_impacts and p_value < 0.05: hardcoded_impact = hardcoded_impacts[condition] difference = abs(learned_impact - hardcoded_impact) if difference > 0.05: # More than 5% difference insight = { 'type': 'optimization', 'priority': 'high' if difference > 0.15 else 'medium', 'category': 'forecasting', 'title': f'Weather Rule Mismatch: {condition.title()}', 'description': f'Learned {condition} impact is {learned_impact*100:.1f}% vs hardcoded {hardcoded_impact*100:.1f}%. Updating rule could improve forecast accuracy by {difference*100:.1f}%.', 'impact_type': 'forecast_improvement', 'impact_value': difference * 100, 'impact_unit': 'percentage_points', 'confidence': self._calculate_confidence(len(condition_days), p_value), 'metrics_json': { 'weather_condition': condition, 'learned_impact_pct': round(learned_impact * 100, 2), 'hardcoded_impact_pct': round(hardcoded_impact * 100, 2), 'difference_pct': round(difference * 100, 2), 'baseline_avg': round(baseline_avg, 2), 'condition_avg': round(condition_avg, 2), 'sample_size': len(condition_days), 'p_value': round(p_value, 4) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Update Weather Rule', 'action': 'update_weather_multiplier', 'params': { 'condition': condition, 'new_multiplier': round(1 + learned_impact, 3) } } ], 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } insights.append(insight) logger.info( "Weather rule discrepancy detected", condition=condition, learned=f"{learned_impact*100:.1f}%", hardcoded=f"{hardcoded_impact*100:.1f}%" ) return weather_rules, insights async def _learn_holiday_rules( self, sales_data: pd.DataFrame, external_data: pd.DataFrame, min_samples: int ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Learn holiday impact by holiday type instead of uniform +50%. Hardcoded: All holidays = +50% Learn: Christmas vs Easter vs National holidays have different impacts """ logger.info("Learning holiday impact rules") # Merge sales with holiday data merged = sales_data.merge( external_data[['date', 'is_holiday', 'holiday_name', 'holiday_type']], on='date', how='left' ) # Baseline: non-holiday average non_holidays = merged[merged['is_holiday'] == False] baseline_avg = non_holidays['quantity'].mean() holiday_rules = { 'baseline_avg': float(baseline_avg), 'hardcoded_multiplier': 1.5, # Current +50% 'holiday_types': {} } insights = [] # Learn impact per holiday type if 'holiday_type' in merged.columns: for holiday_type in merged[merged['is_holiday'] == True]['holiday_type'].unique(): if pd.isna(holiday_type): continue holiday_days = merged[merged['holiday_type'] == holiday_type] if len(holiday_days) >= min_samples: holiday_avg = holiday_days['quantity'].mean() learned_multiplier = holiday_avg / baseline_avg learned_impact = (learned_multiplier - 1) * 100 # Statistical test t_stat, p_value = stats.ttest_ind( holiday_days['quantity'].values, non_holidays['quantity'].values, equal_var=False ) holiday_rules['holiday_types'][holiday_type] = { 'learned_multiplier': float(learned_multiplier), 'learned_impact_pct': float(learned_impact), 'sample_size': int(len(holiday_days)), 'avg_quantity': float(holiday_avg), 'p_value': float(p_value), 'significant': bool(p_value < 0.05) } # Compare with hardcoded +50% hardcoded_multiplier = 1.5 difference = abs(learned_multiplier - hardcoded_multiplier) if difference > 0.1 and p_value < 0.05: # More than 10% difference insight = { 'type': 'recommendation', 'priority': 'high' if difference > 0.3 else 'medium', 'category': 'forecasting', 'title': f'Holiday Rule Optimization: {holiday_type}', 'description': f'{holiday_type} shows {learned_impact:.1f}% impact vs hardcoded +50%. Using learned multiplier {learned_multiplier:.2f}x could improve forecast accuracy.', 'impact_type': 'forecast_improvement', 'impact_value': difference * 100, 'impact_unit': 'percentage_points', 'confidence': self._calculate_confidence(len(holiday_days), p_value), 'metrics_json': { 'holiday_type': holiday_type, 'learned_multiplier': round(learned_multiplier, 3), 'hardcoded_multiplier': 1.5, 'learned_impact_pct': round(learned_impact, 2), 'hardcoded_impact_pct': 50.0, 'baseline_avg': round(baseline_avg, 2), 'holiday_avg': round(holiday_avg, 2), 'sample_size': len(holiday_days), 'p_value': round(p_value, 4) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Update Holiday Rule', 'action': 'update_holiday_multiplier', 'params': { 'holiday_type': holiday_type, 'new_multiplier': round(learned_multiplier, 3) } } ], 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } insights.append(insight) logger.info( "Holiday rule optimization identified", holiday_type=holiday_type, learned=f"{learned_multiplier:.2f}x", hardcoded="1.5x" ) # Overall holiday impact all_holidays = merged[merged['is_holiday'] == True] if len(all_holidays) >= min_samples: overall_avg = all_holidays['quantity'].mean() overall_multiplier = overall_avg / baseline_avg holiday_rules['overall_learned_multiplier'] = float(overall_multiplier) holiday_rules['overall_learned_impact_pct'] = float((overall_multiplier - 1) * 100) return holiday_rules, insights async def _learn_event_rules( self, sales_data: pd.DataFrame, external_data: pd.DataFrame, min_samples: int ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Learn event impact by event type instead of uniform +30%. Hardcoded: All events = +30% Learn: Sports events vs concerts vs festivals have different impacts """ logger.info("Learning event impact rules") # Merge sales with event data merged = sales_data.merge( external_data[['date', 'event_name', 'event_type', 'event_attendance']], on='date', how='left' ) # Baseline: non-event days non_events = merged[merged['event_name'].isna()] baseline_avg = non_events['quantity'].mean() event_rules = { 'baseline_avg': float(baseline_avg), 'hardcoded_multiplier': 1.3, # Current +30% 'event_types': {} } insights = [] # Learn impact per event type if 'event_type' in merged.columns: for event_type in merged[merged['event_type'].notna()]['event_type'].unique(): if pd.isna(event_type): continue event_days = merged[merged['event_type'] == event_type] if len(event_days) >= min_samples: event_avg = event_days['quantity'].mean() learned_multiplier = event_avg / baseline_avg learned_impact = (learned_multiplier - 1) * 100 # Statistical test t_stat, p_value = stats.ttest_ind( event_days['quantity'].values, non_events['quantity'].values, equal_var=False ) event_rules['event_types'][event_type] = { 'learned_multiplier': float(learned_multiplier), 'learned_impact_pct': float(learned_impact), 'sample_size': int(len(event_days)), 'avg_quantity': float(event_avg), 'p_value': float(p_value), 'significant': bool(p_value < 0.05) } # Compare with hardcoded +30% hardcoded_multiplier = 1.3 difference = abs(learned_multiplier - hardcoded_multiplier) if difference > 0.1 and p_value < 0.05: insight = { 'type': 'recommendation', 'priority': 'medium', 'category': 'forecasting', 'title': f'Event Rule Optimization: {event_type}', 'description': f'{event_type} events show {learned_impact:.1f}% impact vs hardcoded +30%. Using learned multiplier could improve event forecasts.', 'impact_type': 'forecast_improvement', 'impact_value': difference * 100, 'impact_unit': 'percentage_points', 'confidence': self._calculate_confidence(len(event_days), p_value), 'metrics_json': { 'event_type': event_type, 'learned_multiplier': round(learned_multiplier, 3), 'hardcoded_multiplier': 1.3, 'learned_impact_pct': round(learned_impact, 2), 'hardcoded_impact_pct': 30.0, 'baseline_avg': round(baseline_avg, 2), 'event_avg': round(event_avg, 2), 'sample_size': len(event_days), 'p_value': round(p_value, 4) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Update Event Rule', 'action': 'update_event_multiplier', 'params': { 'event_type': event_type, 'new_multiplier': round(learned_multiplier, 3) } } ], 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } insights.append(insight) return event_rules, insights async def _learn_day_of_week_rules( self, sales_data: pd.DataFrame, min_samples: int ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Learn day-of-week patterns per product. Replace general assumptions with product-specific patterns. """ logger.info("Learning day-of-week patterns") sales_data = sales_data.copy() sales_data['day_of_week'] = sales_data['date'].dt.dayofweek sales_data['day_name'] = sales_data['date'].dt.day_name() # Calculate average per day of week dow_avg = sales_data.groupby('day_of_week')['quantity'].agg(['mean', 'std', 'count']) overall_avg = sales_data['quantity'].mean() dow_rules = { 'overall_avg': float(overall_avg), 'days': {} } insights = [] day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] for dow in range(7): if dow not in dow_avg.index or dow_avg.loc[dow, 'count'] < min_samples: continue day_avg = dow_avg.loc[dow, 'mean'] day_std = dow_avg.loc[dow, 'std'] day_count = dow_avg.loc[dow, 'count'] multiplier = day_avg / overall_avg impact_pct = (multiplier - 1) * 100 # Coefficient of variation cv = (day_std / day_avg) if day_avg > 0 else 0 dow_rules['days'][day_names[dow]] = { 'day_of_week': int(dow), 'learned_multiplier': float(multiplier), 'impact_pct': float(impact_pct), 'avg_quantity': float(day_avg), 'std_quantity': float(day_std), 'sample_size': int(day_count), 'coefficient_of_variation': float(cv) } # Insight for significant deviations if abs(impact_pct) > 20: # More than 20% difference insight = { 'type': 'insight', 'priority': 'medium' if abs(impact_pct) > 30 else 'low', 'category': 'forecasting', 'title': f'{day_names[dow]} Pattern: {abs(impact_pct):.0f}% {"Higher" if impact_pct > 0 else "Lower"}', 'description': f'{day_names[dow]} sales average {day_avg:.1f} units ({impact_pct:+.1f}% vs weekly average {overall_avg:.1f}). Consider this pattern in production planning.', 'impact_type': 'operational_insight', 'impact_value': abs(impact_pct), 'impact_unit': 'percentage', 'confidence': self._calculate_confidence(day_count, 0.01), # Low p-value for large samples 'metrics_json': { 'day_of_week': day_names[dow], 'day_multiplier': round(multiplier, 3), 'impact_pct': round(impact_pct, 2), 'day_avg': round(day_avg, 2), 'overall_avg': round(overall_avg, 2), 'sample_size': int(day_count), 'std': round(day_std, 2) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Adjust Production Schedule', 'action': 'adjust_weekly_production', 'params': { 'day': day_names[dow], 'multiplier': round(multiplier, 3) } } ], 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } insights.append(insight) return dow_rules, insights async def _learn_month_rules( self, sales_data: pd.DataFrame, min_samples: int ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Learn monthly seasonality patterns per product. """ logger.info("Learning monthly seasonality patterns") sales_data = sales_data.copy() sales_data['month'] = sales_data['date'].dt.month sales_data['month_name'] = sales_data['date'].dt.month_name() # Calculate average per month month_avg = sales_data.groupby('month')['quantity'].agg(['mean', 'std', 'count']) overall_avg = sales_data['quantity'].mean() month_rules = { 'overall_avg': float(overall_avg), 'months': {} } insights = [] month_names = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] for month in range(1, 13): if month not in month_avg.index or month_avg.loc[month, 'count'] < min_samples: continue month_mean = month_avg.loc[month, 'mean'] month_std = month_avg.loc[month, 'std'] month_count = month_avg.loc[month, 'count'] multiplier = month_mean / overall_avg impact_pct = (multiplier - 1) * 100 month_rules['months'][month_names[month - 1]] = { 'month': int(month), 'learned_multiplier': float(multiplier), 'impact_pct': float(impact_pct), 'avg_quantity': float(month_mean), 'std_quantity': float(month_std), 'sample_size': int(month_count) } # Insight for significant seasonal patterns if abs(impact_pct) > 25: # More than 25% seasonal variation insight = { 'type': 'insight', 'priority': 'medium', 'category': 'forecasting', 'title': f'Seasonal Pattern: {month_names[month - 1]} {abs(impact_pct):.0f}% {"Higher" if impact_pct > 0 else "Lower"}', 'description': f'{month_names[month - 1]} shows strong seasonality with {impact_pct:+.1f}% vs annual average. Plan inventory accordingly.', 'impact_type': 'operational_insight', 'impact_value': abs(impact_pct), 'impact_unit': 'percentage', 'confidence': self._calculate_confidence(month_count, 0.01), 'metrics_json': { 'month': month_names[month - 1], 'multiplier': round(multiplier, 3), 'impact_pct': round(impact_pct, 2), 'month_avg': round(month_mean, 2), 'annual_avg': round(overall_avg, 2), 'sample_size': int(month_count) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Adjust Seasonal Planning', 'action': 'adjust_seasonal_forecast', 'params': { 'month': month_names[month - 1], 'multiplier': round(multiplier, 3) } } ], 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } insights.append(insight) return month_rules, insights def _calculate_confidence(self, sample_size: int, p_value: float) -> int: """ Calculate confidence score (0-100) based on sample size and statistical significance. Args: sample_size: Number of observations p_value: Statistical significance p-value Returns: Confidence score 0-100 """ # Sample size score (0-50 points) if sample_size >= 100: sample_score = 50 elif sample_size >= 50: sample_score = 40 elif sample_size >= 30: sample_score = 30 elif sample_size >= 20: sample_score = 20 else: sample_score = 10 # Statistical significance score (0-50 points) if p_value < 0.001: sig_score = 50 elif p_value < 0.01: sig_score = 45 elif p_value < 0.05: sig_score = 35 elif p_value < 0.1: sig_score = 20 else: sig_score = 10 return min(100, sample_score + sig_score) def get_rule( self, inventory_product_id: str, rule_type: str, key: str ) -> Optional[float]: """ Get learned rule multiplier for a specific condition. Args: inventory_product_id: Product identifier rule_type: 'weather', 'holiday', 'event', 'day_of_week', 'month' key: Specific condition key (e.g., 'rain', 'Christmas', 'Monday') Returns: Learned multiplier or None if not learned """ if rule_type == 'weather': rules = self.weather_rules.get(inventory_product_id, {}) return rules.get('conditions', {}).get(key, {}).get('learned_multiplier') elif rule_type == 'holiday': rules = self.holiday_rules.get(inventory_product_id, {}) return rules.get('holiday_types', {}).get(key, {}).get('learned_multiplier') elif rule_type == 'event': rules = self.event_rules.get(inventory_product_id, {}) return rules.get('event_types', {}).get(key, {}).get('learned_multiplier') elif rule_type == 'day_of_week': rules = self.dow_rules.get(inventory_product_id, {}) return rules.get('days', {}).get(key, {}).get('learned_multiplier') elif rule_type == 'month': rules = self.month_rules.get(inventory_product_id, {}) return rules.get('months', {}).get(key, {}).get('learned_multiplier') return None def export_rules_for_prophet( self, inventory_product_id: str ) -> Dict[str, Any]: """ Export learned rules in format suitable for Prophet model integration. Returns: Dictionary with multipliers for Prophet custom seasonality/regressors """ return { 'weather': self.weather_rules.get(inventory_product_id, {}), 'holidays': self.holiday_rules.get(inventory_product_id, {}), 'events': self.event_rules.get(inventory_product_id, {}), 'day_of_week': self.dow_rules.get(inventory_product_id, {}), 'months': self.month_rules.get(inventory_product_id, {}) }