diff --git a/services/forecasting/app/api/ml_insights.py b/services/forecasting/app/api/ml_insights.py index d201ad87..bbbf4fb3 100644 --- a/services/forecasting/app/api/ml_insights.py +++ b/services/forecasting/app/api/ml_insights.py @@ -858,11 +858,11 @@ async def trigger_demand_insights_internal( ) # Fetch historical sales data - sales_data_raw = await sales_client.get_product_sales( + sales_data_raw = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, - start_date=start_date, - end_date=end_date + start_date=start_date.strftime('%Y-%m-%d'), + end_date=end_date.strftime('%Y-%m-%d') ) if not sales_data_raw or len(sales_data_raw) < 10: @@ -876,26 +876,47 @@ async def trigger_demand_insights_internal( # Convert to DataFrame sales_df = pd.DataFrame(sales_data_raw) + # Map field names to expected format + if 'quantity' not in sales_df.columns: + if 'total_quantity' in sales_df.columns: + sales_df['quantity'] = sales_df['total_quantity'] + elif 'quantity_sold' in sales_df.columns: + sales_df['quantity'] = sales_df['quantity_sold'] + else: + logger.warning( + "No quantity field found for product", + product_id=product_id + ) + continue + + if 'date' not in sales_df.columns: + if 'sale_date' in sales_df.columns: + sales_df['date'] = sales_df['sale_date'] + else: + logger.warning( + "No date field found for product", + product_id=product_id + ) + continue + # Run demand insights orchestrator - insights = await orchestrator.analyze_and_generate_insights( + results = await orchestrator.analyze_and_post_demand_insights( tenant_id=tenant_id, - product_id=product_id, - product_name=product_name, + inventory_product_id=product_id, sales_data=sales_df, - lookback_days=90, - db=db + forecast_horizon_days=30, + min_history_days=90 ) - if insights: - total_insights_generated += len(insights) - total_insights_posted += len(insights) + total_insights_generated += results['insights_generated'] + total_insights_posted += results['insights_posted'] - logger.info( - "Demand insights generated for product", - tenant_id=tenant_id, - product_id=product_id, - insights_count=len(insights) - ) + logger.info( + "Demand insights generated for product", + tenant_id=tenant_id, + product_id=product_id, + insights_posted=results['insights_posted'] + ) except Exception as e: logger.warning( diff --git a/services/forecasting/app/ml/predictor.py b/services/forecasting/app/ml/predictor.py index 421a48a6..d1e0b95b 100644 --- a/services/forecasting/app/ml/predictor.py +++ b/services/forecasting/app/ml/predictor.py @@ -32,12 +32,22 @@ class BakeryPredictor: self.use_dynamic_rules = use_dynamic_rules if use_dynamic_rules: - from app.ml.dynamic_rules_engine import DynamicRulesEngine - from shared.clients.ai_insights_client import AIInsightsClient - self.rules_engine = DynamicRulesEngine() - self.ai_insights_client = AIInsightsClient( - base_url=settings.AI_INSIGHTS_SERVICE_URL or "http://ai-insights-service:8000" - ) + try: + from app.ml.dynamic_rules_engine import DynamicRulesEngine + from shared.clients.ai_insights_client import AIInsightsClient + self.rules_engine = DynamicRulesEngine() + self.ai_insights_client = AIInsightsClient( + base_url=settings.AI_INSIGHTS_SERVICE_URL or "http://ai-insights-service:8000" + ) + # Also provide business_rules for consistency + self.business_rules = BakeryBusinessRules( + use_dynamic_rules=True, + ai_insights_client=self.ai_insights_client + ) + except ImportError as e: + logger.warning(f"Failed to import dynamic rules engine: {e}. Falling back to basic business rules.") + self.use_dynamic_rules = False + self.business_rules = BakeryBusinessRules() else: self.business_rules = BakeryBusinessRules() @@ -52,6 +62,9 @@ class BakeryForecaster: self.predictor = BakeryPredictor(database_manager) self.use_enhanced_features = use_enhanced_features + # Initialize business rules - this was missing! This fixes the AttributeError + self.business_rules = BakeryBusinessRules(use_dynamic_rules=True, ai_insights_client=self.predictor.ai_insights_client if hasattr(self.predictor, 'ai_insights_client') else None) + # Initialize POI feature service from app.services.poi_feature_service import POIFeatureService self.poi_feature_service = POIFeatureService() @@ -72,24 +85,6 @@ class BakeryForecaster: else: self.data_processor = None - async def generate_forecast_with_repository(self, tenant_id: str, inventory_product_id: str, - forecast_date: date, model_id: str = None) -> Dict[str, Any]: - """Generate forecast with repository integration""" - try: - # This would integrate with repositories for model loading and caching - # Implementation would be added here - return { - "tenant_id": tenant_id, - "inventory_product_id": inventory_product_id, - "forecast_date": forecast_date.isoformat(), - "prediction": 0.0, - "confidence_interval": {"lower": 0.0, "upper": 0.0}, - "status": "completed", - "repository_integration": True - } - except Exception as e: - logger.error("Forecast generation failed", error=str(e)) - raise async def predict_demand(self, model, features: Dict[str, Any], business_type: str = "individual") -> Dict[str, float]: @@ -317,6 +312,218 @@ class BakeryForecaster: return 0.1 # 10% additional uncertainty on weekends return 0.0 + async def analyze_demand_patterns( + self, + tenant_id: str, + inventory_product_id: str, + sales_data: pd.DataFrame, + forecast_horizon_days: int = 30, + min_history_days: int = 90 + ) -> Dict[str, Any]: + """ + Analyze demand patterns by delegating to the sales service. + + NOTE: Sales data analysis is the responsibility of the sales service. + This method calls the sales service API to get demand pattern analysis. + + Args: + tenant_id: Tenant identifier + inventory_product_id: Product identifier + sales_data: Historical sales DataFrame (not used - kept for backward compatibility) + forecast_horizon_days: Days to forecast ahead (not used currently) + min_history_days: Minimum history required + + Returns: + Analysis results with patterns, trends, and insights from sales service + """ + try: + from shared.clients.sales_client import SalesServiceClient + from datetime import date, timedelta + + logger.info( + "Requesting demand pattern analysis from sales service", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id + ) + + # Initialize sales client + sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") + + # Calculate date range + end_date = date.today() + start_date = end_date - timedelta(days=min_history_days) + + # Call sales service for pattern analysis + patterns = await sales_client.get_product_demand_patterns( + tenant_id=tenant_id, + product_id=inventory_product_id, + start_date=start_date, + end_date=end_date, + min_history_days=min_history_days + ) + + # Generate insights from patterns + insights = self._generate_insights_from_patterns( + patterns, + tenant_id, + inventory_product_id + ) + + # Add insights to the result + patterns['insights'] = insights + + logger.info( + "Demand pattern analysis received from sales service", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + insights_generated=len(insights) + ) + + return patterns + + except Exception as e: + logger.error( + "Error getting demand patterns from sales service", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + error=str(e), + exc_info=True + ) + return { + 'analyzed_at': datetime.utcnow().isoformat(), + 'history_days': 0, + 'insights': [], + 'patterns': {}, + 'trend_analysis': {}, + 'seasonal_factors': {}, + 'statistics': {}, + 'error': str(e) + } + + def _generate_insights_from_patterns( + self, + patterns: Dict[str, Any], + tenant_id: str, + inventory_product_id: str + ) -> List[Dict[str, Any]]: + """ + Generate actionable insights from demand patterns provided by sales service. + + Args: + patterns: Demand patterns from sales service + tenant_id: Tenant identifier + inventory_product_id: Product identifier + + Returns: + List of insights for AI Insights Service + """ + insights = [] + + # Check if there was an error in pattern analysis + if 'error' in patterns: + return insights + + trend = patterns.get('trend_analysis', {}) + stats = patterns.get('statistics', {}) + seasonal = patterns.get('seasonal_factors', {}) + + # Trend insight + if trend.get('is_increasing'): + insights.append({ + 'type': 'insight', + 'priority': 'medium', + 'category': 'forecasting', + 'title': 'Increasing Demand Trend Detected', + 'description': f"Product shows {trend.get('direction', 'increasing')} demand trend. Consider increasing inventory levels.", + 'impact_type': 'demand_increase', + 'impact_value': abs(trend.get('correlation', 0) * 100), + 'impact_unit': 'percent', + 'confidence': min(int(abs(trend.get('correlation', 0)) * 100), 95), + 'metrics_json': trend, + 'actionable': True, + 'recommendation_actions': [ + { + 'label': 'Increase Safety Stock', + 'action': 'increase_safety_stock', + 'params': {'product_id': inventory_product_id, 'factor': 1.2} + } + ] + }) + elif trend.get('is_decreasing'): + insights.append({ + 'type': 'insight', + 'priority': 'low', + 'category': 'forecasting', + 'title': 'Decreasing Demand Trend Detected', + 'description': f"Product shows {trend.get('direction', 'decreasing')} demand trend. Consider reviewing inventory strategy.", + 'impact_type': 'demand_decrease', + 'impact_value': abs(trend.get('correlation', 0) * 100), + 'impact_unit': 'percent', + 'confidence': min(int(abs(trend.get('correlation', 0)) * 100), 95), + 'metrics_json': trend, + 'actionable': True, + 'recommendation_actions': [ + { + 'label': 'Review Inventory Levels', + 'action': 'review_inventory', + 'params': {'product_id': inventory_product_id} + } + ] + }) + + # Volatility insight + cv = stats.get('coefficient_of_variation', 0) + if cv > 0.5: + insights.append({ + 'type': 'alert', + 'priority': 'medium', + 'category': 'forecasting', + 'title': 'High Demand Variability Detected', + 'description': f'Product has high demand variability (CV: {cv:.2f}). Consider dynamic safety stock levels.', + 'impact_type': 'demand_variability', + 'impact_value': round(cv * 100, 1), + 'impact_unit': 'percent', + 'confidence': 85, + 'metrics_json': stats, + 'actionable': True, + 'recommendation_actions': [ + { + 'label': 'Enable Dynamic Safety Stock', + 'action': 'enable_dynamic_safety_stock', + 'params': {'product_id': inventory_product_id} + } + ] + }) + + # Seasonal pattern insight + peak_ratio = seasonal.get('peak_ratio', 1.0) + if peak_ratio > 1.5: + pattern_data = patterns.get('patterns', {}) + peak_day = pattern_data.get('peak_day', 0) + low_day = pattern_data.get('low_day', 0) + insights.append({ + 'type': 'insight', + 'priority': 'medium', + 'category': 'forecasting', + 'title': 'Strong Weekly Pattern Detected', + 'description': f'Demand is {peak_ratio:.1f}x higher on day {peak_day} compared to day {low_day}. Adjust production schedule accordingly.', + 'impact_type': 'seasonal_pattern', + 'impact_value': round((peak_ratio - 1) * 100, 1), + 'impact_unit': 'percent', + 'confidence': 80, + 'metrics_json': {**seasonal, **pattern_data}, + 'actionable': True, + 'recommendation_actions': [ + { + 'label': 'Adjust Production Schedule', + 'action': 'adjust_production', + 'params': {'product_id': inventory_product_id, 'pattern': 'weekly'} + } + ] + }) + + return insights + async def _get_dynamic_rules(self, tenant_id: str, inventory_product_id: str, rule_type: str) -> Dict[str, float]: """ Fetch learned dynamic rules from AI Insights Service. @@ -359,6 +566,48 @@ class BakeryForecaster: logger.warning(f"Failed to fetch dynamic rules: {e}") return {} + async def generate_forecast_with_repository(self, tenant_id: str, inventory_product_id: str, + forecast_date: date, model_id: str = None) -> Dict[str, Any]: + """Generate forecast with repository integration""" + try: + # This would integrate with repositories for model loading and caching + # For now, we'll implement basic forecasting logic using the forecaster's methods + # This is a simplified approach - in production, this would use repositories + + # For now, prepare minimal features for prediction + features = { + 'date': forecast_date.isoformat(), + 'day_of_week': forecast_date.weekday(), + 'is_weekend': 1 if forecast_date.weekday() >= 5 else 0, + 'is_holiday': 0, # Would come from calendar service in real implementation + # Add default weather values if needed + 'temperature': 20.0, + 'precipitation': 0.0, + } + + # This is a placeholder - in a full implementation, we would: + # 1. Load the appropriate model from repository + # 2. Use historical data to make prediction + # 3. Apply business rules + # For now, return the structure with basic info + + # For more realistic implementation, we'd use self.predict_demand method + # but that requires a model object which needs to be loaded + + return { + "tenant_id": tenant_id, + "inventory_product_id": inventory_product_id, + "forecast_date": forecast_date.isoformat(), + "prediction": 10.0, # Placeholder value - in reality would be calculated + "confidence_interval": {"lower": 8.0, "upper": 12.0}, # Placeholder values + "status": "completed", + "repository_integration": True, + "forecast_method": "placeholder" + } + except Exception as e: + logger.error("Forecast generation failed", error=str(e)) + raise + class BakeryBusinessRules: """ @@ -582,17 +831,24 @@ class BakeryBusinessRules: return prediction - def _apply_spanish_rules(self, prediction: Dict[str, float], + def _apply_spanish_rules(self, prediction: Dict[str, float], features: Dict[str, Any]) -> Dict[str, float]: """Apply Spanish bakery specific rules""" - + # Spanish siesta time considerations - current_date = pd.to_datetime(features['date']) - day_of_week = current_date.weekday() - - # Reduced activity during typical siesta hours (14:00-17:00) - # This affects afternoon sales planning - if day_of_week < 5: # Weekdays - prediction["yhat"] *= 0.95 # Slight reduction for siesta effect - + date_str = features.get('date') + if date_str: + try: + current_date = pd.to_datetime(date_str) + day_of_week = current_date.weekday() + + # Reduced activity during typical siesta hours (14:00-17:00) + # This affects afternoon sales planning + if day_of_week < 5: # Weekdays + prediction["yhat"] *= 0.95 # Slight reduction for siesta effect + except Exception as e: + logger.warning(f"Error processing date in spanish rules: {e}") + else: + logger.warning("Date not provided in features, skipping Spanish rules") + return prediction diff --git a/services/sales/app/api/analytics.py b/services/sales/app/api/analytics.py index fa27f452..5bc767f5 100644 --- a/services/sales/app/api/analytics.py +++ b/services/sales/app/api/analytics.py @@ -45,3 +45,55 @@ async def get_sales_analytics( except Exception as e: logger.error("Failed to get sales analytics", error=str(e), tenant_id=tenant_id) raise HTTPException(status_code=500, detail=f"Failed to get sales analytics: {str(e)}") + + +@router.get( + route_builder.build_analytics_route("products/{product_id}/demand-patterns") +) +@analytics_tier_required +async def get_product_demand_patterns( + tenant_id: UUID = Path(..., description="Tenant ID"), + product_id: UUID = Path(..., description="Product ID (inventory_product_id)"), + start_date: Optional[datetime] = Query(None, description="Start date for analysis"), + end_date: Optional[datetime] = Query(None, description="End date for analysis"), + min_history_days: int = Query(90, description="Minimum days of history required", ge=30, le=365), + current_user: Dict[str, Any] = Depends(get_current_user_dep), + sales_service: SalesService = Depends(get_sales_service) +): + """ + Analyze demand patterns for a specific product (Professional+ tier required). + + Returns: + - Demand trends (increasing/decreasing/stable) + - Volatility metrics (coefficient of variation) + - Weekly seasonal patterns + - Peak/low demand days + - Statistical summaries + """ + try: + patterns = await sales_service.analyze_product_demand_patterns( + tenant_id=tenant_id, + inventory_product_id=product_id, + start_date=start_date, + end_date=end_date, + min_history_days=min_history_days + ) + + logger.info( + "Retrieved product demand patterns", + tenant_id=tenant_id, + product_id=product_id + ) + return patterns + + except Exception as e: + logger.error( + "Failed to get product demand patterns", + error=str(e), + tenant_id=tenant_id, + product_id=product_id + ) + raise HTTPException( + status_code=500, + detail=f"Failed to analyze demand patterns: {str(e)}" + ) diff --git a/services/sales/app/services/sales_service.py b/services/sales/app/services/sales_service.py index 4990baa2..0bd4efe7 100644 --- a/services/sales/app/services/sales_service.py +++ b/services/sales/app/services/sales_service.py @@ -501,15 +501,157 @@ class SalesService: error=str(e), product_id=product_id, tenant_id=tenant_id) return None - async def get_inventory_products_by_category(self, category: str, tenant_id: UUID, + async def get_inventory_products_by_category(self, category: str, tenant_id: UUID, product_type: Optional[str] = None) -> List[Dict[str, Any]]: """Get products by category from inventory service""" try: products = await self.inventory_client.get_products_by_category(category, tenant_id, product_type) - logger.info("Retrieved inventory products by category", category=category, + logger.info("Retrieved inventory products by category", category=category, count=len(products), tenant_id=tenant_id) return products except Exception as e: - logger.error("Failed to get inventory products by category", + logger.error("Failed to get inventory products by category", error=str(e), category=category, tenant_id=tenant_id) - return [] \ No newline at end of file + return [] + + async def analyze_product_demand_patterns( + self, + tenant_id: UUID, + inventory_product_id: UUID, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + min_history_days: int = 90 + ) -> Dict[str, Any]: + """ + Analyze demand patterns for a specific product from historical sales data. + + This method provides insights on: + - Demand trends (increasing/decreasing) + - Volatility (coefficient of variation) + - Weekly seasonal patterns + - Peak/low demand days + + Args: + tenant_id: Tenant identifier + inventory_product_id: Product identifier + start_date: Start date for analysis (optional) + end_date: End date for analysis (optional) + min_history_days: Minimum days of history required + + Returns: + Analysis results with patterns, trends, and statistics + """ + try: + import pandas as pd + + logger.info( + "Analyzing product demand patterns", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id + ) + + # Fetch sales data for the product + sales_records = await self.get_product_sales( + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + start_date=start_date, + end_date=end_date + ) + + if not sales_records or len(sales_records) < min_history_days: + return { + 'analyzed_at': datetime.utcnow().isoformat(), + 'history_days': len(sales_records) if sales_records else 0, + 'patterns': {}, + 'trend_analysis': {}, + 'seasonal_factors': {}, + 'statistics': {}, + 'error': f'Insufficient historical data (need {min_history_days} days, got {len(sales_records) if sales_records else 0})' + } + + # Convert to DataFrame for analysis + sales_data = pd.DataFrame([{ + 'date': record.date, + 'quantity': record.quantity_sold, + 'revenue': float(record.revenue) if record.revenue else 0 + } for record in sales_records]) + + sales_data['date'] = pd.to_datetime(sales_data['date']) + sales_data = sales_data.sort_values('date') + + # Calculate basic statistics + mean_demand = sales_data['quantity'].mean() + std_demand = sales_data['quantity'].std() + cv = (std_demand / mean_demand) if mean_demand > 0 else 0 + + # Trend analysis + sales_data['days_since_start'] = (sales_data['date'] - sales_data['date'].min()).dt.days + trend_correlation = sales_data['days_since_start'].corr(sales_data['quantity']) + is_increasing = trend_correlation > 0.2 + is_decreasing = trend_correlation < -0.2 + + # Seasonal pattern detection (day of week) + sales_data['day_of_week'] = sales_data['date'].dt.dayofweek + weekly_pattern = sales_data.groupby('day_of_week')['quantity'].mean().to_dict() + peak_day = max(weekly_pattern, key=weekly_pattern.get) + low_day = min(weekly_pattern, key=weekly_pattern.get) + peak_ratio = weekly_pattern[peak_day] / weekly_pattern[low_day] if weekly_pattern[low_day] > 0 else 1.0 + + logger.info( + "Demand pattern analysis complete", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + data_points=len(sales_data), + trend_direction='increasing' if is_increasing else 'decreasing' if is_decreasing else 'stable' + ) + + return { + 'analyzed_at': datetime.utcnow().isoformat(), + 'history_days': len(sales_data), + 'date_range': { + 'start': sales_data['date'].min().isoformat(), + 'end': sales_data['date'].max().isoformat() + }, + 'statistics': { + 'mean_demand': round(mean_demand, 2), + 'std_demand': round(std_demand, 2), + 'coefficient_of_variation': round(cv, 3), + 'total_quantity': round(sales_data['quantity'].sum(), 2), + 'total_revenue': round(sales_data['revenue'].sum(), 2), + 'min_demand': round(sales_data['quantity'].min(), 2), + 'max_demand': round(sales_data['quantity'].max(), 2) + }, + 'trend_analysis': { + 'correlation': round(trend_correlation, 3), + 'is_increasing': is_increasing, + 'is_decreasing': is_decreasing, + 'direction': 'increasing' if is_increasing else 'decreasing' if is_decreasing else 'stable' + }, + 'patterns': { + 'weekly_pattern': {str(k): round(v, 2) for k, v in weekly_pattern.items()}, + 'peak_day': int(peak_day), + 'low_day': int(low_day) + }, + 'seasonal_factors': { + 'peak_ratio': round(peak_ratio, 2), + 'has_strong_pattern': peak_ratio > 1.5 + } + } + + except Exception as e: + logger.error( + "Error analyzing product demand patterns", + tenant_id=tenant_id, + inventory_product_id=inventory_product_id, + error=str(e), + exc_info=True + ) + return { + 'analyzed_at': datetime.utcnow().isoformat(), + 'history_days': 0, + 'patterns': {}, + 'trend_analysis': {}, + 'seasonal_factors': {}, + 'statistics': {}, + 'error': str(e) + } \ No newline at end of file diff --git a/shared/clients/sales_client.py b/shared/clients/sales_client.py index c92c4a7b..d568a997 100755 --- a/shared/clients/sales_client.py +++ b/shared/clients/sales_client.py @@ -274,6 +274,56 @@ class SalesServiceClient(BaseServiceClient): ) return {} + async def get_product_demand_patterns( + self, + tenant_id: str, + product_id: str, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + min_history_days: int = 90 + ) -> Dict[str, Any]: + """ + Get demand pattern analysis for a specific product. + + Args: + tenant_id: Tenant identifier + product_id: Product identifier (inventory_product_id) + start_date: Start date for analysis + end_date: End date for analysis + min_history_days: Minimum days of history required + + Returns: + Demand pattern analysis including trends, seasonality, and statistics + """ + try: + params = {"min_history_days": min_history_days} + if start_date: + params["start_date"] = start_date.isoformat() + if end_date: + params["end_date"] = end_date.isoformat() + + result = await self.get( + f"sales/analytics/products/{product_id}/demand-patterns", + tenant_id=tenant_id, + params=params + ) + + logger.info( + "Retrieved product demand patterns", + tenant_id=tenant_id, + product_id=product_id + ) + return result if result else {} + + except Exception as e: + logger.error( + "Failed to get product demand patterns", + error=str(e), + tenant_id=tenant_id, + product_id=product_id + ) + return {} + # ================================================================ # DATA IMPORT # ================================================================