594 lines
22 KiB
Python
594 lines
22 KiB
Python
|
|
"""
|
||
|
|
Pattern Detection Engine for Sales Data
|
||
|
|
Automatically identifies patterns and generates insights
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
import numpy as np
|
||
|
|
from typing import Dict, List, Any, Optional, Tuple
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
import structlog
|
||
|
|
from scipy import stats
|
||
|
|
from collections import defaultdict
|
||
|
|
|
||
|
|
logger = structlog.get_logger()
|
||
|
|
|
||
|
|
|
||
|
|
class SalesPatternDetector:
|
||
|
|
"""
|
||
|
|
Detect sales patterns and generate actionable insights.
|
||
|
|
|
||
|
|
Patterns detected:
|
||
|
|
- Time-of-day patterns (hourly peaks)
|
||
|
|
- Day-of-week patterns (weekend spikes)
|
||
|
|
- Weekly seasonality patterns
|
||
|
|
- Monthly patterns
|
||
|
|
- Holiday impact patterns
|
||
|
|
- Weather correlation patterns
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, significance_threshold: float = 0.15):
|
||
|
|
"""
|
||
|
|
Initialize pattern detector.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
significance_threshold: Minimum percentage difference to consider significant (default 15%)
|
||
|
|
"""
|
||
|
|
self.significance_threshold = significance_threshold
|
||
|
|
self.detected_patterns = []
|
||
|
|
|
||
|
|
async def detect_all_patterns(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int = 70
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Detect all patterns in sales data and generate insights.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
tenant_id: Tenant identifier
|
||
|
|
inventory_product_id: Product identifier
|
||
|
|
sales_data: Sales data with columns: date, quantity, (optional: hour, temperature, etc.)
|
||
|
|
min_confidence: Minimum confidence score for insights
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of insight dictionaries ready for AI Insights Service
|
||
|
|
"""
|
||
|
|
logger.info(
|
||
|
|
"Starting pattern detection",
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
product_id=inventory_product_id,
|
||
|
|
data_points=len(sales_data)
|
||
|
|
)
|
||
|
|
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
# Ensure date column is datetime
|
||
|
|
if 'date' in sales_data.columns:
|
||
|
|
sales_data['date'] = pd.to_datetime(sales_data['date'])
|
||
|
|
|
||
|
|
# 1. Day-of-week patterns
|
||
|
|
dow_insights = await self._detect_day_of_week_patterns(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(dow_insights)
|
||
|
|
|
||
|
|
# 2. Weekend vs weekday patterns
|
||
|
|
weekend_insights = await self._detect_weekend_patterns(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(weekend_insights)
|
||
|
|
|
||
|
|
# 3. Month-end patterns
|
||
|
|
month_end_insights = await self._detect_month_end_patterns(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(month_end_insights)
|
||
|
|
|
||
|
|
# 4. Hourly patterns (if hour data available)
|
||
|
|
if 'hour' in sales_data.columns:
|
||
|
|
hourly_insights = await self._detect_hourly_patterns(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(hourly_insights)
|
||
|
|
|
||
|
|
# 5. Weather correlation (if temperature data available)
|
||
|
|
if 'temperature' in sales_data.columns:
|
||
|
|
weather_insights = await self._detect_weather_correlations(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(weather_insights)
|
||
|
|
|
||
|
|
# 6. Trend detection
|
||
|
|
trend_insights = await self._detect_trends(
|
||
|
|
tenant_id, inventory_product_id, sales_data, min_confidence
|
||
|
|
)
|
||
|
|
insights.extend(trend_insights)
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"Pattern detection complete",
|
||
|
|
total_insights=len(insights),
|
||
|
|
product_id=inventory_product_id
|
||
|
|
)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_day_of_week_patterns(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect day-of-week patterns (e.g., Friday sales spike)."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'date' not in sales_data.columns or 'quantity' not in sales_data.columns:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Add day of week
|
||
|
|
sales_data['day_of_week'] = sales_data['date'].dt.dayofweek
|
||
|
|
sales_data['day_name'] = sales_data['date'].dt.day_name()
|
||
|
|
|
||
|
|
# Calculate average sales per day of week
|
||
|
|
dow_avg = sales_data.groupby(['day_of_week', 'day_name'])['quantity'].agg(['mean', 'count']).reset_index()
|
||
|
|
|
||
|
|
# Only consider days with sufficient data (at least 4 observations)
|
||
|
|
dow_avg = dow_avg[dow_avg['count'] >= 4]
|
||
|
|
|
||
|
|
if len(dow_avg) < 2:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
overall_avg = sales_data['quantity'].mean()
|
||
|
|
|
||
|
|
# Find days significantly above average
|
||
|
|
for _, row in dow_avg.iterrows():
|
||
|
|
day_avg = row['mean']
|
||
|
|
pct_diff = ((day_avg - overall_avg) / overall_avg) * 100
|
||
|
|
|
||
|
|
if abs(pct_diff) > self.significance_threshold * 100:
|
||
|
|
# Calculate confidence based on sample size and consistency
|
||
|
|
confidence = self._calculate_pattern_confidence(
|
||
|
|
sample_size=int(row['count']),
|
||
|
|
effect_size=abs(pct_diff) / 100,
|
||
|
|
variability=sales_data['quantity'].std()
|
||
|
|
)
|
||
|
|
|
||
|
|
if confidence >= min_confidence:
|
||
|
|
if pct_diff > 0:
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='pattern',
|
||
|
|
category='sales',
|
||
|
|
priority='medium' if pct_diff > 20 else 'low',
|
||
|
|
title=f'{row["day_name"]} Sales Pattern Detected',
|
||
|
|
description=f'Sales on {row["day_name"]} are {abs(pct_diff):.1f}% {"higher" if pct_diff > 0 else "lower"} than average ({day_avg:.1f} vs {overall_avg:.1f} units).',
|
||
|
|
confidence=confidence,
|
||
|
|
metrics={
|
||
|
|
'day_of_week': row['day_name'],
|
||
|
|
'avg_sales': float(day_avg),
|
||
|
|
'overall_avg': float(overall_avg),
|
||
|
|
'difference_pct': float(pct_diff),
|
||
|
|
'sample_size': int(row['count'])
|
||
|
|
},
|
||
|
|
actionable=True,
|
||
|
|
actions=[
|
||
|
|
{'label': 'Adjust Production', 'action': 'adjust_daily_production'},
|
||
|
|
{'label': 'Review Schedule', 'action': 'review_production_schedule'}
|
||
|
|
]
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_weekend_patterns(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect weekend vs weekday patterns."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'date' not in sales_data.columns or 'quantity' not in sales_data.columns:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Classify weekend vs weekday
|
||
|
|
sales_data['is_weekend'] = sales_data['date'].dt.dayofweek.isin([5, 6])
|
||
|
|
|
||
|
|
# Calculate averages
|
||
|
|
weekend_avg = sales_data[sales_data['is_weekend']]['quantity'].mean()
|
||
|
|
weekday_avg = sales_data[~sales_data['is_weekend']]['quantity'].mean()
|
||
|
|
|
||
|
|
weekend_count = sales_data[sales_data['is_weekend']]['quantity'].count()
|
||
|
|
weekday_count = sales_data[~sales_data['is_weekend']]['quantity'].count()
|
||
|
|
|
||
|
|
if weekend_count < 4 or weekday_count < 4:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
pct_diff = ((weekend_avg - weekday_avg) / weekday_avg) * 100
|
||
|
|
|
||
|
|
if abs(pct_diff) > self.significance_threshold * 100:
|
||
|
|
confidence = self._calculate_pattern_confidence(
|
||
|
|
sample_size=min(weekend_count, weekday_count),
|
||
|
|
effect_size=abs(pct_diff) / 100,
|
||
|
|
variability=sales_data['quantity'].std()
|
||
|
|
)
|
||
|
|
|
||
|
|
if confidence >= min_confidence:
|
||
|
|
# Estimate revenue impact
|
||
|
|
impact_value = abs(weekend_avg - weekday_avg) * 8 * 4 # 8 weekend days per month
|
||
|
|
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='recommendation',
|
||
|
|
category='forecasting',
|
||
|
|
priority='high' if abs(pct_diff) > 25 else 'medium',
|
||
|
|
title=f'Weekend Demand Pattern: {abs(pct_diff):.0f}% {"Higher" if pct_diff > 0 else "Lower"}',
|
||
|
|
description=f'Weekend sales average {weekend_avg:.1f} units vs {weekday_avg:.1f} on weekdays ({abs(pct_diff):.0f}% {"increase" if pct_diff > 0 else "decrease"}). Recommend adjusting weekend production targets.',
|
||
|
|
confidence=confidence,
|
||
|
|
impact_type='revenue_increase' if pct_diff > 0 else 'cost_savings',
|
||
|
|
impact_value=float(impact_value),
|
||
|
|
impact_unit='units/month',
|
||
|
|
metrics={
|
||
|
|
'weekend_avg': float(weekend_avg),
|
||
|
|
'weekday_avg': float(weekday_avg),
|
||
|
|
'difference_pct': float(pct_diff),
|
||
|
|
'weekend_samples': int(weekend_count),
|
||
|
|
'weekday_samples': int(weekday_count)
|
||
|
|
},
|
||
|
|
actionable=True,
|
||
|
|
actions=[
|
||
|
|
{'label': 'Increase Weekend Production', 'action': 'adjust_weekend_production'},
|
||
|
|
{'label': 'Update Forecast Multiplier', 'action': 'update_forecast_rule'}
|
||
|
|
]
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_month_end_patterns(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect month-end and payday patterns."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'date' not in sales_data.columns or 'quantity' not in sales_data.columns:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Identify payday periods (15th and last 3 days of month)
|
||
|
|
sales_data['day_of_month'] = sales_data['date'].dt.day
|
||
|
|
sales_data['is_payday'] = (
|
||
|
|
(sales_data['day_of_month'] == 15) |
|
||
|
|
(sales_data['date'].dt.is_month_end) |
|
||
|
|
(sales_data['day_of_month'] >= sales_data['date'].dt.days_in_month - 2)
|
||
|
|
)
|
||
|
|
|
||
|
|
payday_avg = sales_data[sales_data['is_payday']]['quantity'].mean()
|
||
|
|
regular_avg = sales_data[~sales_data['is_payday']]['quantity'].mean()
|
||
|
|
|
||
|
|
payday_count = sales_data[sales_data['is_payday']]['quantity'].count()
|
||
|
|
|
||
|
|
if payday_count < 4:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
pct_diff = ((payday_avg - regular_avg) / regular_avg) * 100
|
||
|
|
|
||
|
|
if abs(pct_diff) > self.significance_threshold * 100:
|
||
|
|
confidence = self._calculate_pattern_confidence(
|
||
|
|
sample_size=payday_count,
|
||
|
|
effect_size=abs(pct_diff) / 100,
|
||
|
|
variability=sales_data['quantity'].std()
|
||
|
|
)
|
||
|
|
|
||
|
|
if confidence >= min_confidence and pct_diff > 0:
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='pattern',
|
||
|
|
category='sales',
|
||
|
|
priority='medium',
|
||
|
|
title=f'Payday Shopping Pattern Detected',
|
||
|
|
description=f'Sales increase {pct_diff:.0f}% during payday periods (15th and month-end). Average {payday_avg:.1f} vs {regular_avg:.1f} units.',
|
||
|
|
confidence=confidence,
|
||
|
|
metrics={
|
||
|
|
'payday_avg': float(payday_avg),
|
||
|
|
'regular_avg': float(regular_avg),
|
||
|
|
'difference_pct': float(pct_diff)
|
||
|
|
},
|
||
|
|
actionable=True,
|
||
|
|
actions=[
|
||
|
|
{'label': 'Increase Payday Stock', 'action': 'adjust_payday_production'}
|
||
|
|
]
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_hourly_patterns(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect hourly sales patterns (if POS data available)."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'hour' not in sales_data.columns or 'quantity' not in sales_data.columns:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
hourly_avg = sales_data.groupby('hour')['quantity'].agg(['mean', 'count']).reset_index()
|
||
|
|
hourly_avg = hourly_avg[hourly_avg['count'] >= 3] # At least 3 observations
|
||
|
|
|
||
|
|
if len(hourly_avg) < 3:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
overall_avg = sales_data['quantity'].mean()
|
||
|
|
|
||
|
|
# Find peak hours (top 3)
|
||
|
|
top_hours = hourly_avg.nlargest(3, 'mean')
|
||
|
|
|
||
|
|
for _, row in top_hours.iterrows():
|
||
|
|
hour_avg = row['mean']
|
||
|
|
pct_diff = ((hour_avg - overall_avg) / overall_avg) * 100
|
||
|
|
|
||
|
|
if pct_diff > self.significance_threshold * 100:
|
||
|
|
confidence = self._calculate_pattern_confidence(
|
||
|
|
sample_size=int(row['count']),
|
||
|
|
effect_size=pct_diff / 100,
|
||
|
|
variability=sales_data['quantity'].std()
|
||
|
|
)
|
||
|
|
|
||
|
|
if confidence >= min_confidence:
|
||
|
|
hour = int(row['hour'])
|
||
|
|
time_label = f"{hour:02d}:00-{(hour+1):02d}:00"
|
||
|
|
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='pattern',
|
||
|
|
category='sales',
|
||
|
|
priority='low',
|
||
|
|
title=f'Peak Sales Hour: {time_label}',
|
||
|
|
description=f'Sales peak during {time_label} with {hour_avg:.1f} units ({pct_diff:.0f}% above average).',
|
||
|
|
confidence=confidence,
|
||
|
|
metrics={
|
||
|
|
'peak_hour': hour,
|
||
|
|
'avg_sales': float(hour_avg),
|
||
|
|
'overall_avg': float(overall_avg),
|
||
|
|
'difference_pct': float(pct_diff)
|
||
|
|
},
|
||
|
|
actionable=True,
|
||
|
|
actions=[
|
||
|
|
{'label': 'Ensure Fresh Stock', 'action': 'schedule_production'},
|
||
|
|
{'label': 'Increase Staffing', 'action': 'adjust_staffing'}
|
||
|
|
]
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_weather_correlations(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect weather-sales correlations."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'temperature' not in sales_data.columns or 'quantity' not in sales_data.columns:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Remove NaN values
|
||
|
|
clean_data = sales_data[['temperature', 'quantity']].dropna()
|
||
|
|
|
||
|
|
if len(clean_data) < 30: # Need sufficient data
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Calculate correlation
|
||
|
|
correlation, p_value = stats.pearsonr(clean_data['temperature'], clean_data['quantity'])
|
||
|
|
|
||
|
|
if abs(correlation) > 0.3 and p_value < 0.05: # Moderate correlation and significant
|
||
|
|
confidence = self._calculate_correlation_confidence(correlation, p_value, len(clean_data))
|
||
|
|
|
||
|
|
if confidence >= min_confidence:
|
||
|
|
direction = 'increase' if correlation > 0 else 'decrease'
|
||
|
|
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='insight',
|
||
|
|
category='forecasting',
|
||
|
|
priority='medium' if abs(correlation) > 0.5 else 'low',
|
||
|
|
title=f'Temperature Impact on Sales: {abs(correlation):.0%} Correlation',
|
||
|
|
description=f'Sales {direction} with temperature (correlation: {correlation:.2f}). {"Warmer" if correlation > 0 else "Colder"} weather associated with {"higher" if correlation > 0 else "lower"} sales.',
|
||
|
|
confidence=confidence,
|
||
|
|
metrics={
|
||
|
|
'correlation': float(correlation),
|
||
|
|
'p_value': float(p_value),
|
||
|
|
'sample_size': len(clean_data),
|
||
|
|
'direction': direction
|
||
|
|
},
|
||
|
|
actionable=False
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
async def _detect_trends(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
sales_data: pd.DataFrame,
|
||
|
|
min_confidence: int
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
"""Detect overall trends (growing, declining, stable)."""
|
||
|
|
insights = []
|
||
|
|
|
||
|
|
if 'date' not in sales_data.columns or 'quantity' not in sales_data.columns or len(sales_data) < 60:
|
||
|
|
return insights
|
||
|
|
|
||
|
|
# Sort by date
|
||
|
|
sales_data = sales_data.sort_values('date')
|
||
|
|
|
||
|
|
# Calculate 30-day rolling average
|
||
|
|
sales_data['rolling_30d'] = sales_data['quantity'].rolling(window=30, min_periods=15).mean()
|
||
|
|
|
||
|
|
# Compare first and last 30-day averages
|
||
|
|
first_30_avg = sales_data['rolling_30d'].iloc[:30].mean()
|
||
|
|
last_30_avg = sales_data['rolling_30d'].iloc[-30:].mean()
|
||
|
|
|
||
|
|
if pd.isna(first_30_avg) or pd.isna(last_30_avg):
|
||
|
|
return insights
|
||
|
|
|
||
|
|
pct_change = ((last_30_avg - first_30_avg) / first_30_avg) * 100
|
||
|
|
|
||
|
|
if abs(pct_change) > 10: # 10% change is significant
|
||
|
|
confidence = min(95, 70 + int(abs(pct_change))) # Higher change = higher confidence
|
||
|
|
|
||
|
|
trend_type = 'growing' if pct_change > 0 else 'declining'
|
||
|
|
|
||
|
|
insight = self._create_insight(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
inventory_product_id=inventory_product_id,
|
||
|
|
insight_type='prediction',
|
||
|
|
category='forecasting',
|
||
|
|
priority='high' if abs(pct_change) > 20 else 'medium',
|
||
|
|
title=f'Sales Trend: {trend_type.title()} {abs(pct_change):.0f}%',
|
||
|
|
description=f'Sales show a {trend_type} trend over the period. Current 30-day average: {last_30_avg:.1f} vs earlier: {first_30_avg:.1f} ({pct_change:+.0f}%).',
|
||
|
|
confidence=confidence,
|
||
|
|
metrics={
|
||
|
|
'current_avg': float(last_30_avg),
|
||
|
|
'previous_avg': float(first_30_avg),
|
||
|
|
'change_pct': float(pct_change),
|
||
|
|
'trend': trend_type
|
||
|
|
},
|
||
|
|
actionable=True,
|
||
|
|
actions=[
|
||
|
|
{'label': 'Adjust Forecast Model', 'action': 'update_forecast'},
|
||
|
|
{'label': 'Review Capacity', 'action': 'review_production_capacity'}
|
||
|
|
]
|
||
|
|
)
|
||
|
|
insights.append(insight)
|
||
|
|
|
||
|
|
return insights
|
||
|
|
|
||
|
|
def _calculate_pattern_confidence(
|
||
|
|
self,
|
||
|
|
sample_size: int,
|
||
|
|
effect_size: float,
|
||
|
|
variability: float
|
||
|
|
) -> int:
|
||
|
|
"""
|
||
|
|
Calculate confidence score for detected pattern.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
sample_size: Number of observations
|
||
|
|
effect_size: Size of the effect (e.g., 0.25 for 25% difference)
|
||
|
|
variability: Standard deviation of data
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Confidence score (0-100)
|
||
|
|
"""
|
||
|
|
# Base confidence from sample size
|
||
|
|
if sample_size < 4:
|
||
|
|
base = 50
|
||
|
|
elif sample_size < 10:
|
||
|
|
base = 65
|
||
|
|
elif sample_size < 30:
|
||
|
|
base = 75
|
||
|
|
elif sample_size < 100:
|
||
|
|
base = 85
|
||
|
|
else:
|
||
|
|
base = 90
|
||
|
|
|
||
|
|
# Adjust for effect size
|
||
|
|
effect_boost = min(15, effect_size * 30)
|
||
|
|
|
||
|
|
# Adjust for variability (penalize high variability)
|
||
|
|
variability_penalty = min(10, variability / 10)
|
||
|
|
|
||
|
|
confidence = base + effect_boost - variability_penalty
|
||
|
|
|
||
|
|
return int(max(0, min(100, confidence)))
|
||
|
|
|
||
|
|
def _calculate_correlation_confidence(
|
||
|
|
self,
|
||
|
|
correlation: float,
|
||
|
|
p_value: float,
|
||
|
|
sample_size: int
|
||
|
|
) -> int:
|
||
|
|
"""Calculate confidence for correlation insights."""
|
||
|
|
# Base confidence from correlation strength
|
||
|
|
base = abs(correlation) * 100
|
||
|
|
|
||
|
|
# Boost for significance
|
||
|
|
if p_value < 0.001:
|
||
|
|
significance_boost = 15
|
||
|
|
elif p_value < 0.01:
|
||
|
|
significance_boost = 10
|
||
|
|
elif p_value < 0.05:
|
||
|
|
significance_boost = 5
|
||
|
|
else:
|
||
|
|
significance_boost = 0
|
||
|
|
|
||
|
|
# Boost for sample size
|
||
|
|
if sample_size > 100:
|
||
|
|
sample_boost = 10
|
||
|
|
elif sample_size > 50:
|
||
|
|
sample_boost = 5
|
||
|
|
else:
|
||
|
|
sample_boost = 0
|
||
|
|
|
||
|
|
confidence = base + significance_boost + sample_boost
|
||
|
|
|
||
|
|
return int(max(0, min(100, confidence)))
|
||
|
|
|
||
|
|
def _create_insight(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
inventory_product_id: str,
|
||
|
|
insight_type: str,
|
||
|
|
category: str,
|
||
|
|
priority: str,
|
||
|
|
title: str,
|
||
|
|
description: str,
|
||
|
|
confidence: int,
|
||
|
|
metrics: Dict[str, Any],
|
||
|
|
actionable: bool,
|
||
|
|
actions: List[Dict[str, str]] = None,
|
||
|
|
impact_type: str = None,
|
||
|
|
impact_value: float = None,
|
||
|
|
impact_unit: str = None
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Create an insight dictionary for AI Insights Service."""
|
||
|
|
return {
|
||
|
|
'tenant_id': tenant_id,
|
||
|
|
'type': insight_type,
|
||
|
|
'priority': priority,
|
||
|
|
'category': category,
|
||
|
|
'title': title,
|
||
|
|
'description': description,
|
||
|
|
'impact_type': impact_type,
|
||
|
|
'impact_value': impact_value,
|
||
|
|
'impact_unit': impact_unit,
|
||
|
|
'confidence': confidence,
|
||
|
|
'metrics_json': metrics,
|
||
|
|
'actionable': actionable,
|
||
|
|
'recommendation_actions': actions or [],
|
||
|
|
'source_service': 'forecasting',
|
||
|
|
'source_data_id': f'pattern_detection_{inventory_product_id}_{datetime.utcnow().strftime("%Y%m%d")}'
|
||
|
|
}
|