""" Rules Orchestrator Coordinates dynamic rules learning, insight posting, and integration with forecasting service """ import pandas as pd from typing import Dict, List, Any, Optional import structlog from datetime import datetime from uuid import UUID from app.ml.dynamic_rules_engine import DynamicRulesEngine from app.clients.ai_insights_client import AIInsightsClient from shared.messaging import UnifiedEventPublisher logger = structlog.get_logger() class RulesOrchestrator: """ Orchestrates dynamic rules learning and insight generation workflow. Workflow: 1. Learn dynamic rules from historical data 2. Generate insights comparing learned vs hardcoded rules 3. Post insights to AI Insights Service 4. Provide learned rules for forecasting integration 5. Track rule updates and performance """ def __init__( self, ai_insights_base_url: str = "http://ai-insights-service:8000", event_publisher: Optional[UnifiedEventPublisher] = None ): self.rules_engine = DynamicRulesEngine() self.ai_insights_client = AIInsightsClient(ai_insights_base_url) self.event_publisher = event_publisher async def learn_and_post_rules( self, tenant_id: str, inventory_product_id: str, sales_data: pd.DataFrame, external_data: Optional[pd.DataFrame] = None, min_samples: int = 10 ) -> Dict[str, Any]: """ Complete workflow: Learn rules and post insights. Args: tenant_id: Tenant identifier inventory_product_id: Product identifier sales_data: Historical sales data external_data: Optional weather/events/holidays data min_samples: Minimum samples for rule learning Returns: Workflow results with learned rules and posted insights """ logger.info( "Starting dynamic rules learning workflow", tenant_id=tenant_id, inventory_product_id=inventory_product_id ) # Step 1: Learn all rules from data rules_results = await self.rules_engine.learn_all_rules( tenant_id=tenant_id, inventory_product_id=inventory_product_id, sales_data=sales_data, external_data=external_data, min_samples=min_samples ) logger.info( "Rules learning complete", insights_generated=len(rules_results['insights']), rules_learned=len(rules_results['rules']) ) # Step 2: Enrich insights with tenant_id and product context enriched_insights = self._enrich_insights( rules_results['insights'], tenant_id, inventory_product_id ) # Step 3: Post insights to AI Insights Service if enriched_insights: post_results = await self.ai_insights_client.create_insights_bulk( tenant_id=UUID(tenant_id), insights=enriched_insights ) logger.info( "Insights posted to AI Insights Service", total=post_results['total'], successful=post_results['successful'], failed=post_results['failed'] ) else: post_results = {'total': 0, 'successful': 0, 'failed': 0} logger.info("No insights to post") # Step 4: Publish insight events to RabbitMQ created_insights = post_results.get('created_insights', []) if created_insights: product_context = {'inventory_product_id': inventory_product_id} await self._publish_insight_events( tenant_id=tenant_id, insights=created_insights, product_context=product_context ) # Step 5: Return comprehensive results return { 'tenant_id': tenant_id, 'inventory_product_id': inventory_product_id, 'learned_at': rules_results['learned_at'], 'rules': rules_results['rules'], 'insights_generated': len(enriched_insights), 'insights_posted': post_results['successful'], 'insights_failed': post_results['failed'], 'created_insights': post_results.get('created_insights', []) } def _enrich_insights( self, insights: List[Dict[str, Any]], tenant_id: str, inventory_product_id: str ) -> List[Dict[str, Any]]: """ Enrich insights with required fields for AI Insights Service. Args: insights: Raw insights from rules engine tenant_id: Tenant identifier inventory_product_id: Product identifier Returns: Enriched insights ready for posting """ enriched = [] for insight in insights: # Add required tenant_id and product context enriched_insight = insight.copy() enriched_insight['tenant_id'] = tenant_id # Add product context to metrics if 'metrics_json' not in enriched_insight: enriched_insight['metrics_json'] = {} enriched_insight['metrics_json']['inventory_product_id'] = inventory_product_id # Add source metadata enriched_insight['source_service'] = 'forecasting' enriched_insight['source_model'] = 'dynamic_rules_engine' enriched_insight['detected_at'] = datetime.utcnow().isoformat() enriched.append(enriched_insight) return enriched async def get_learned_rules_for_forecasting( self, inventory_product_id: str ) -> Dict[str, Any]: """ Get learned rules in format ready for forecasting integration. Args: inventory_product_id: Product identifier Returns: Dictionary with learned multipliers for all rule types """ return self.rules_engine.export_rules_for_prophet(inventory_product_id) def get_rule_multiplier( self, inventory_product_id: str, rule_type: str, key: str, default: float = 1.0 ) -> float: """ Get learned rule multiplier with fallback to default. Args: inventory_product_id: Product identifier rule_type: 'weather', 'holiday', 'event', 'day_of_week', 'month' key: Condition key default: Default multiplier if rule not learned Returns: Learned multiplier or default """ learned = self.rules_engine.get_rule(inventory_product_id, rule_type, key) return learned if learned is not None else default async def update_rules_periodically( self, tenant_id: str, inventory_product_id: str, sales_data: pd.DataFrame, external_data: Optional[pd.DataFrame] = None ) -> Dict[str, Any]: """ Update learned rules with new data (for periodic refresh). Args: tenant_id: Tenant identifier inventory_product_id: Product identifier sales_data: Updated historical sales data external_data: Updated external data Returns: Update results """ logger.info( "Updating learned rules with new data", tenant_id=tenant_id, inventory_product_id=inventory_product_id, new_data_points=len(sales_data) ) # Re-learn rules with updated data results = await self.learn_and_post_rules( tenant_id=tenant_id, inventory_product_id=inventory_product_id, sales_data=sales_data, external_data=external_data ) logger.info( "Rules update complete", insights_posted=results['insights_posted'] ) return results async def _publish_insight_events(self, tenant_id, insights, product_context=None): """ Publish insight events to RabbitMQ for alert processing. Args: tenant_id: Tenant identifier insights: List of created insights product_context: Additional context about the product """ if not self.event_publisher: logger.warning("No event publisher available for business rules insights") return for insight in insights: # Determine severity based on confidence and priority confidence = insight.get('confidence', 0) priority = insight.get('priority', 'medium') # Map priority to severity, with confidence as tiebreaker if priority == 'critical' or (priority == 'high' and confidence >= 70): severity = 'high' elif priority == 'high' or (priority == 'medium' and confidence >= 80): severity = 'medium' else: severity = 'low' # Prepare the event data event_data = { 'insight_id': insight.get('id'), 'type': insight.get('type'), 'title': insight.get('title'), 'description': insight.get('description'), 'category': insight.get('category'), 'priority': insight.get('priority'), 'confidence': confidence, 'recommendation': insight.get('recommendation_actions', []), 'impact_type': insight.get('impact_type'), 'impact_value': insight.get('impact_value'), 'inventory_product_id': product_context.get('inventory_product_id') if product_context else None, 'timestamp': insight.get('detected_at', datetime.utcnow().isoformat()), 'source_service': 'forecasting', 'source_model': 'dynamic_rules_engine' } try: await self.event_publisher.publish_recommendation( event_type='ai_business_rule', tenant_id=tenant_id, severity=severity, data=event_data ) logger.info( "Published business rules insight event", tenant_id=tenant_id, insight_id=insight.get('id'), severity=severity ) except Exception as e: logger.error( "Failed to publish business rules insight event", tenant_id=tenant_id, insight_id=insight.get('id'), error=str(e) ) async def close(self): """Close HTTP client connections.""" await self.ai_insights_client.close()