""" Price Insights Orchestrator Coordinates price forecasting and insight posting """ import pandas as pd from typing import Dict, List, Any, Optional import structlog from datetime import datetime from uuid import UUID import sys import os # Add shared clients to path sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..')) from shared.clients.ai_insights_client import AIInsightsClient from shared.messaging import UnifiedEventPublisher from app.ml.price_forecaster import PriceForecaster logger = structlog.get_logger() class PriceInsightsOrchestrator: """ Orchestrates price forecasting and insight generation workflow. Workflow: 1. Forecast prices from historical data 2. Generate buy/wait/bulk recommendations 3. Post insights to AI Insights Service 4. Provide price forecasts for procurement planning """ def __init__( self, ai_insights_base_url: str = "http://ai-insights-service:8000", event_publisher: Optional[UnifiedEventPublisher] = None ): self.forecaster = PriceForecaster() self.ai_insights_client = AIInsightsClient(ai_insights_base_url) self.event_publisher = event_publisher async def forecast_and_post_insights( self, tenant_id: str, ingredient_id: str, price_history: pd.DataFrame, forecast_horizon_days: int = 30, min_history_days: int = 180 ) -> Dict[str, Any]: """ Complete workflow: Forecast prices and post insights. Args: tenant_id: Tenant identifier ingredient_id: Ingredient identifier price_history: Historical price data forecast_horizon_days: Days to forecast ahead min_history_days: Minimum days of history required Returns: Workflow results with forecast and posted insights """ logger.info( "Starting price forecasting workflow", tenant_id=tenant_id, ingredient_id=ingredient_id, history_days=len(price_history) ) # Step 1: Forecast prices forecast_results = await self.forecaster.forecast_price( tenant_id=tenant_id, ingredient_id=ingredient_id, price_history=price_history, forecast_horizon_days=forecast_horizon_days, min_history_days=min_history_days ) logger.info( "Price forecasting complete", ingredient_id=ingredient_id, recommendation=forecast_results.get('recommendations', {}).get('action'), insights_generated=len(forecast_results.get('insights', [])) ) # Step 2: Enrich insights with tenant_id and ingredient context enriched_insights = self._enrich_insights( forecast_results.get('insights', []), tenant_id, ingredient_id ) # Step 3: Post insights to AI Insights Service if enriched_insights: post_results = await self.ai_insights_client.create_insights_bulk( tenant_id=UUID(tenant_id), insights=enriched_insights ) logger.info( "Price insights posted to AI Insights Service", ingredient_id=ingredient_id, total=post_results['total'], successful=post_results['successful'], failed=post_results['failed'] ) else: post_results = {'total': 0, 'successful': 0, 'failed': 0} logger.info("No insights to post for ingredient", ingredient_id=ingredient_id) # Step 4: Publish insight events to RabbitMQ created_insights = post_results.get('created_insights', []) if created_insights: ingredient_context = {'ingredient_id': ingredient_id} await self._publish_insight_events( tenant_id=tenant_id, insights=created_insights, ingredient_context=ingredient_context ) # Step 5: Return comprehensive results return { 'tenant_id': tenant_id, 'ingredient_id': ingredient_id, 'forecasted_at': forecast_results['forecasted_at'], 'history_days': forecast_results['history_days'], 'forecast': forecast_results.get('forecast', {}), 'recommendation': forecast_results.get('recommendations', {}), 'bulk_opportunity': forecast_results.get('bulk_opportunities', {}), 'insights_generated': len(enriched_insights), 'insights_posted': post_results['successful'], 'insights_failed': post_results['failed'], 'created_insights': post_results.get('created_insights', []) } def _enrich_insights( self, insights: List[Dict[str, Any]], tenant_id: str, ingredient_id: str ) -> List[Dict[str, Any]]: """ Enrich insights with required fields for AI Insights Service. Args: insights: Raw insights from forecaster tenant_id: Tenant identifier ingredient_id: Ingredient identifier Returns: Enriched insights ready for posting """ enriched = [] for insight in insights: # Add required tenant_id enriched_insight = insight.copy() enriched_insight['tenant_id'] = tenant_id # Add ingredient context to metrics if 'metrics_json' not in enriched_insight: enriched_insight['metrics_json'] = {} enriched_insight['metrics_json']['ingredient_id'] = ingredient_id # Add source metadata enriched_insight['source_service'] = 'procurement' enriched_insight['source_model'] = 'price_forecaster' enriched_insight['detected_at'] = datetime.utcnow().isoformat() enriched.append(enriched_insight) return enriched async def forecast_all_ingredients( self, tenant_id: str, ingredients_data: Dict[str, pd.DataFrame], forecast_horizon_days: int = 30, min_history_days: int = 180 ) -> Dict[str, Any]: """ Forecast prices for all ingredients for a tenant. Args: tenant_id: Tenant identifier ingredients_data: Dict of {ingredient_id: price_history DataFrame} forecast_horizon_days: Days to forecast min_history_days: Minimum history required Returns: Comprehensive forecasting results """ logger.info( "Forecasting prices for all ingredients", tenant_id=tenant_id, ingredients=len(ingredients_data) ) all_results = [] total_insights_posted = 0 buy_now_count = 0 wait_count = 0 bulk_opportunity_count = 0 # Forecast each ingredient for ingredient_id, price_history in ingredients_data.items(): try: results = await self.forecast_and_post_insights( tenant_id=tenant_id, ingredient_id=ingredient_id, price_history=price_history, forecast_horizon_days=forecast_horizon_days, min_history_days=min_history_days ) all_results.append(results) total_insights_posted += results['insights_posted'] # Count recommendations action = results['recommendation'].get('action') if action == 'buy_now': buy_now_count += 1 elif action in ['wait', 'wait_for_dip']: wait_count += 1 if results['bulk_opportunity'].get('has_bulk_opportunity'): bulk_opportunity_count += 1 except Exception as e: logger.error( "Error forecasting ingredient", ingredient_id=ingredient_id, error=str(e) ) # Generate summary insight if buy_now_count > 0 or bulk_opportunity_count > 0: summary_insight = self._generate_portfolio_summary_insight( tenant_id, all_results, buy_now_count, wait_count, bulk_opportunity_count ) if summary_insight: enriched_summary = self._enrich_insights( [summary_insight], tenant_id, 'all_ingredients' ) post_results = await self.ai_insights_client.create_insights_bulk( tenant_id=UUID(tenant_id), insights=enriched_summary ) total_insights_posted += post_results['successful'] logger.info( "All ingredients forecasting complete", tenant_id=tenant_id, ingredients_forecasted=len(all_results), total_insights_posted=total_insights_posted, buy_now_recommendations=buy_now_count, bulk_opportunities=bulk_opportunity_count ) return { 'tenant_id': tenant_id, 'forecasted_at': datetime.utcnow().isoformat(), 'ingredients_forecasted': len(all_results), 'ingredient_results': all_results, 'total_insights_posted': total_insights_posted, 'buy_now_count': buy_now_count, 'wait_count': wait_count, 'bulk_opportunity_count': bulk_opportunity_count } async def _publish_insight_events(self, tenant_id, insights, ingredient_context=None): """ Publish insight events to RabbitMQ for alert processing. Args: tenant_id: Tenant identifier insights: List of created insights ingredient_context: Additional context about the ingredient """ if not self.event_publisher: logger.warning("No event publisher available for price insights") return for insight in insights: # Determine severity based on confidence and priority confidence = insight.get('confidence', 0) priority = insight.get('priority', 'medium') # Map priority to severity, with confidence as tiebreaker if priority == 'critical' or (priority == 'high' and confidence >= 70): severity = 'high' elif priority == 'high' or (priority == 'medium' and confidence >= 80): severity = 'medium' else: severity = 'low' # Prepare the event data event_data = { 'insight_id': insight.get('id'), 'type': insight.get('type'), 'title': insight.get('title'), 'description': insight.get('description'), 'category': insight.get('category'), 'priority': insight.get('priority'), 'confidence': confidence, 'recommendation': insight.get('recommendation_actions', []), 'impact_type': insight.get('impact_type'), 'impact_value': insight.get('impact_value'), 'ingredient_id': ingredient_context.get('ingredient_id') if ingredient_context else None, 'timestamp': insight.get('detected_at', datetime.utcnow().isoformat()), 'source_service': 'procurement', 'source_model': 'price_forecaster' } try: await self.event_publisher.publish_recommendation( event_type='ai_price_forecast', tenant_id=tenant_id, severity=severity, data=event_data ) logger.info( "Published price insight event", tenant_id=tenant_id, insight_id=insight.get('id'), severity=severity ) except Exception as e: logger.error( "Failed to publish price insight event", tenant_id=tenant_id, insight_id=insight.get('id'), error=str(e) ) def _generate_portfolio_summary_insight( self, tenant_id: str, all_results: List[Dict[str, Any]], buy_now_count: int, wait_count: int, bulk_opportunity_count: int ) -> Optional[Dict[str, Any]]: """ Generate portfolio-level summary insight. Args: tenant_id: Tenant identifier all_results: All ingredient forecast results buy_now_count: Number of buy now recommendations wait_count: Number of wait recommendations bulk_opportunity_count: Number of bulk opportunities Returns: Summary insight or None """ if buy_now_count == 0 and bulk_opportunity_count == 0: return None # Calculate potential savings from bulk opportunities total_potential_savings = 0 for result in all_results: bulk_opp = result.get('bulk_opportunity', {}) if bulk_opp.get('has_bulk_opportunity'): # Estimate savings (simplified) savings_pct = bulk_opp.get('potential_savings_pct', 0) total_potential_savings += savings_pct avg_potential_savings = total_potential_savings / max(1, bulk_opportunity_count) description_parts = [] if buy_now_count > 0: description_parts.append(f'{buy_now_count} ingredients show price increases - purchase soon') if bulk_opportunity_count > 0: description_parts.append(f'{bulk_opportunity_count} ingredients have bulk buying opportunities (avg {avg_potential_savings:.1f}% savings)') return { 'type': 'recommendation', 'priority': 'high' if buy_now_count > 2 else 'medium', 'category': 'procurement', 'title': f'Procurement Timing Opportunities: {buy_now_count + bulk_opportunity_count} Items', 'description': 'Price forecast analysis identified procurement timing opportunities. ' + '. '.join(description_parts) + '.', 'impact_type': 'cost_optimization', 'impact_value': avg_potential_savings if bulk_opportunity_count > 0 else buy_now_count, 'impact_unit': 'percentage' if bulk_opportunity_count > 0 else 'items', 'confidence': 75, 'metrics_json': { 'ingredients_analyzed': len(all_results), 'buy_now_count': buy_now_count, 'wait_count': wait_count, 'bulk_opportunity_count': bulk_opportunity_count, 'avg_potential_savings_pct': round(avg_potential_savings, 2) }, 'actionable': True, 'recommendation_actions': [ { 'label': 'Review Price Forecasts', 'action': 'review_price_forecasts', 'params': {'tenant_id': tenant_id} }, { 'label': 'Create Optimized Orders', 'action': 'create_optimized_purchase_orders', 'params': {'tenant_id': tenant_id} } ], 'source_service': 'procurement', 'source_model': 'price_forecaster' } async def get_price_forecast( self, ingredient_id: str ) -> Optional[Dict[str, Any]]: """ Get cached seasonal patterns for an ingredient. Args: ingredient_id: Ingredient identifier Returns: Seasonal patterns or None if not forecasted """ return self.forecaster.get_seasonal_patterns(ingredient_id) async def get_volatility_assessment( self, ingredient_id: str ) -> Optional[Dict[str, Any]]: """ Get cached volatility assessment for an ingredient. Args: ingredient_id: Ingredient identifier Returns: Volatility assessment or None if not assessed """ return self.forecaster.get_volatility_score(ingredient_id) async def close(self): """Close HTTP client connections.""" await self.ai_insights_client.close()