Root cause analysis: - The orchestration saga was failing at the 'fetch_shared_data_snapshot' step - Lines 350-356 had a logic error: tried to import pandas in exception handler after pandas import already failed - This caused an uncaught exception that propagated up and failed the entire saga The fix: - Replaced pandas DataFrame placeholder with a simple dict for traffic_predictions - Since traffic predictions are marked as "not yet implemented", pandas is not needed yet - This eliminates the pandas dependency from the orchestrator service - When traffic predictions are implemented in Phase 5, the dict can be converted to DataFrame Impact: - Orchestration saga will no longer fail due to missing pandas - AI enhancement warning will still appear (requires separate fix to add pandas to requirements if needed) - Traffic predictions placeholder now uses empty dict instead of empty DataFrame
1111 lines
45 KiB
Python
1111 lines
45 KiB
Python
"""
|
|
Orchestration Saga Service
|
|
|
|
Implements saga pattern for orchestrator workflow with compensation logic.
|
|
Integrates AI-enhanced orchestration when enabled.
|
|
"""
|
|
|
|
import asyncio
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, Optional
|
|
import logging
|
|
|
|
from shared.utils.saga_pattern import SagaCoordinator
|
|
from shared.clients.forecast_client import ForecastServiceClient
|
|
from shared.clients.production_client import ProductionServiceClient
|
|
from shared.clients.procurement_client import ProcurementServiceClient
|
|
from shared.clients.notification_client import NotificationServiceClient
|
|
from shared.clients.inventory_client import InventoryServiceClient
|
|
from shared.clients.suppliers_client import SuppliersServiceClient
|
|
from shared.clients.recipes_client import RecipesServiceClient
|
|
from shared.clients.ai_insights_client import AIInsightsClient
|
|
from shared.clients.training_client import TrainingServiceClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OrchestrationSaga:
|
|
"""
|
|
Saga coordinator for orchestration workflow.
|
|
|
|
Workflow Steps:
|
|
0. Fetch shared data snapshot (inventory, suppliers, recipes)
|
|
0.5. Generate AI insights from ML orchestrators
|
|
1. Generate forecasts
|
|
2. Generate production schedule
|
|
3. Generate procurement plan
|
|
4. Send notifications
|
|
|
|
Each step has compensation logic to rollback on failure.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
forecast_client: ForecastServiceClient,
|
|
production_client: ProductionServiceClient,
|
|
procurement_client: ProcurementServiceClient,
|
|
notification_client: NotificationServiceClient,
|
|
inventory_client: InventoryServiceClient,
|
|
suppliers_client: SuppliersServiceClient,
|
|
recipes_client: RecipesServiceClient,
|
|
ai_insights_client: Optional[AIInsightsClient] = None,
|
|
training_client: Optional[TrainingServiceClient] = None,
|
|
use_ai_enhancement: bool = False,
|
|
ai_insights_base_url: str = "http://ai-insights-service:8000",
|
|
ai_insights_min_confidence: int = 70,
|
|
# Circuit breakers for fault tolerance
|
|
forecast_breaker: Optional['CircuitBreaker'] = None,
|
|
production_breaker: Optional['CircuitBreaker'] = None,
|
|
procurement_breaker: Optional['CircuitBreaker'] = None,
|
|
inventory_breaker: Optional['CircuitBreaker'] = None,
|
|
suppliers_breaker: Optional['CircuitBreaker'] = None,
|
|
recipes_breaker: Optional['CircuitBreaker'] = None
|
|
):
|
|
"""
|
|
Initialize orchestration saga.
|
|
|
|
Args:
|
|
forecast_client: Forecast service client
|
|
production_client: Production service client
|
|
procurement_client: Procurement service client
|
|
notification_client: Notification service client
|
|
inventory_client: Inventory service client
|
|
suppliers_client: Suppliers service client
|
|
recipes_client: Recipes service client
|
|
ai_insights_client: AI Insights service client
|
|
training_client: Training service client
|
|
use_ai_enhancement: Enable AI-enhanced orchestration
|
|
ai_insights_base_url: Base URL for AI Insights Service
|
|
ai_insights_min_confidence: Minimum confidence threshold for applying insights
|
|
"""
|
|
self.forecast_client = forecast_client
|
|
self.production_client = production_client
|
|
self.procurement_client = procurement_client
|
|
self.notification_client = notification_client
|
|
self.inventory_client = inventory_client
|
|
self.suppliers_client = suppliers_client
|
|
self.recipes_client = recipes_client
|
|
self.ai_insights_client = ai_insights_client or AIInsightsClient(
|
|
base_url=ai_insights_base_url
|
|
)
|
|
self.training_client = training_client
|
|
self.use_ai_enhancement = use_ai_enhancement
|
|
|
|
# Circuit breakers
|
|
self.forecast_breaker = forecast_breaker
|
|
self.production_breaker = production_breaker
|
|
self.procurement_breaker = procurement_breaker
|
|
self.inventory_breaker = inventory_breaker
|
|
self.suppliers_breaker = suppliers_breaker
|
|
self.recipes_breaker = recipes_breaker
|
|
|
|
# Initialize AI enhancer if enabled
|
|
self.ai_enhancer = None
|
|
if use_ai_enhancement:
|
|
try:
|
|
from app.ml.ai_enhanced_orchestrator import AIEnhancedOrchestrator
|
|
self.ai_enhancer = AIEnhancedOrchestrator(
|
|
ai_insights_base_url=ai_insights_base_url,
|
|
min_confidence_threshold=ai_insights_min_confidence
|
|
)
|
|
logger.info("AI-enhanced orchestration enabled")
|
|
except ImportError as e:
|
|
logger.warning(f"AI enhancement requested but could not be loaded: {e}")
|
|
self.use_ai_enhancement = False
|
|
|
|
async def execute_orchestration(
|
|
self,
|
|
tenant_id: str,
|
|
orchestration_run_id: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute full orchestration workflow with saga pattern.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
orchestration_run_id: Orchestration run ID
|
|
|
|
Returns:
|
|
Dictionary with execution results
|
|
"""
|
|
saga = SagaCoordinator(saga_id=f"orchestration_{orchestration_run_id}")
|
|
|
|
# Store execution context
|
|
context = {
|
|
'tenant_id': tenant_id,
|
|
'orchestration_run_id': orchestration_run_id,
|
|
'forecast_id': None,
|
|
'production_schedule_id': None,
|
|
'procurement_plan_id': None,
|
|
'notifications_sent': 0,
|
|
# NEW: Cached data snapshots to avoid duplicate fetching
|
|
'inventory_snapshot': None,
|
|
'suppliers_snapshot': None,
|
|
'recipes_snapshot': None,
|
|
'forecast_data': None,
|
|
'production_data': None,
|
|
'procurement_data': None
|
|
}
|
|
|
|
# Step 0: Fetch shared data snapshot (NEW)
|
|
saga.add_step(
|
|
name="fetch_shared_data_snapshot",
|
|
action=self._fetch_shared_data_snapshot,
|
|
compensation=None, # No compensation needed for read-only operations
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 0.5: Generate AI insights (NEW)
|
|
saga.add_step(
|
|
name="generate_ai_insights",
|
|
action=self._generate_ai_insights,
|
|
compensation=None, # No compensation needed for read-only insight generation
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 1: Generate forecasts
|
|
saga.add_step(
|
|
name="generate_forecasts",
|
|
action=self._generate_forecasts,
|
|
compensation=self._compensate_forecasts,
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 2: Generate production schedule
|
|
saga.add_step(
|
|
name="generate_production_schedule",
|
|
action=self._generate_production_schedule,
|
|
compensation=self._compensate_production_schedule,
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 3: Generate procurement plan
|
|
saga.add_step(
|
|
name="generate_procurement_plan",
|
|
action=self._generate_procurement_plan,
|
|
compensation=self._compensate_procurement_plan,
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 4: Send notifications
|
|
saga.add_step(
|
|
name="send_notifications",
|
|
action=self._send_notifications,
|
|
compensation=None, # No compensation needed for notifications
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Step 5: Validate previous day's forecasts
|
|
saga.add_step(
|
|
name="validate_previous_forecasts",
|
|
action=self._validate_previous_forecasts,
|
|
compensation=None, # No compensation needed for validation
|
|
action_args=(tenant_id, context)
|
|
)
|
|
|
|
# Execute saga
|
|
success, final_result, error = await saga.execute()
|
|
|
|
if success:
|
|
logger.info(
|
|
f"Orchestration saga completed successfully for tenant {tenant_id}"
|
|
)
|
|
return {
|
|
'success': True,
|
|
'forecast_id': context.get('forecast_id'),
|
|
'production_schedule_id': context.get('production_schedule_id'),
|
|
'procurement_plan_id': context.get('procurement_plan_id'),
|
|
'notifications_sent': context.get('notifications_sent', 0),
|
|
'forecast_data': context.get('forecast_data', {}),
|
|
'production_data': context.get('production_data', {}),
|
|
'procurement_data': context.get('procurement_data', {}),
|
|
'ai_insights_generated': context.get('ai_insights_generated', 0),
|
|
'ai_insights_posted': context.get('ai_insights_posted', 0),
|
|
'ai_insights_errors': context.get('ai_insights_errors', []),
|
|
'saga_summary': saga.get_execution_summary()
|
|
}
|
|
else:
|
|
logger.error(
|
|
f"Orchestration saga failed for tenant {tenant_id}: {error}"
|
|
)
|
|
return {
|
|
'success': False,
|
|
'error': str(error),
|
|
'saga_summary': saga.get_execution_summary()
|
|
}
|
|
|
|
# ========================================================================
|
|
# Step 0: Fetch Shared Data Snapshot (NEW)
|
|
# ========================================================================
|
|
|
|
async def _fetch_shared_data_snapshot(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Fetch shared data snapshot once at the beginning of orchestration.
|
|
This eliminates duplicate API calls to inventory, suppliers, and recipes services.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Dictionary with fetched data
|
|
"""
|
|
logger.info(f"Fetching shared data snapshot for tenant {tenant_id}")
|
|
|
|
try:
|
|
# Fetch data in parallel for optimal performance, with circuit breaker protection
|
|
async def fetch_inventory():
|
|
if self.inventory_breaker:
|
|
return await self.inventory_breaker.call(
|
|
self.inventory_client.get_all_ingredients, tenant_id, is_active=True
|
|
)
|
|
else:
|
|
return await self.inventory_client.get_all_ingredients(tenant_id, is_active=True)
|
|
|
|
async def fetch_suppliers():
|
|
if self.suppliers_breaker:
|
|
return await self.suppliers_breaker.call(
|
|
self.suppliers_client.get_all_suppliers, tenant_id, is_active=True
|
|
)
|
|
else:
|
|
return await self.suppliers_client.get_all_suppliers(tenant_id, is_active=True)
|
|
|
|
async def fetch_recipes():
|
|
if self.recipes_breaker:
|
|
return await self.recipes_breaker.call(
|
|
self.recipes_client.get_all_recipes, tenant_id, is_active=True
|
|
)
|
|
else:
|
|
return await self.recipes_client.get_all_recipes(tenant_id, is_active=True)
|
|
|
|
# Wait for all data to be fetched
|
|
inventory_data, suppliers_data, recipes_data = await asyncio.gather(
|
|
fetch_inventory(),
|
|
fetch_suppliers(),
|
|
fetch_recipes(),
|
|
return_exceptions=True
|
|
)
|
|
|
|
# Handle errors for each fetch
|
|
failures = 0
|
|
if isinstance(inventory_data, Exception):
|
|
logger.error(f"Failed to fetch inventory data: {inventory_data}")
|
|
inventory_data = []
|
|
failures += 1
|
|
|
|
if isinstance(suppliers_data, Exception):
|
|
logger.error(f"Failed to fetch suppliers data: {suppliers_data}")
|
|
suppliers_data = []
|
|
failures += 1
|
|
|
|
if isinstance(recipes_data, Exception):
|
|
logger.error(f"Failed to fetch recipes data: {recipes_data}")
|
|
recipes_data = []
|
|
failures += 1
|
|
|
|
# If all three fetches failed, treat it as a critical failure
|
|
if failures >= 3:
|
|
logger.error(f"All shared data fetches failed for tenant {tenant_id}")
|
|
raise Exception("Unable to fetch any shared data (inventory, suppliers, recipes)")
|
|
|
|
# Store in context for downstream services
|
|
context['inventory_snapshot'] = {
|
|
'ingredients': inventory_data,
|
|
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
|
'count': len(inventory_data) if inventory_data else 0
|
|
}
|
|
|
|
context['suppliers_snapshot'] = {
|
|
'suppliers': suppliers_data,
|
|
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
|
'count': len(suppliers_data) if suppliers_data else 0
|
|
}
|
|
|
|
context['recipes_snapshot'] = {
|
|
'recipes': recipes_data,
|
|
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
|
'count': len(recipes_data) if recipes_data else 0
|
|
}
|
|
|
|
# NEW: Fetch upcoming events for next 7 days
|
|
try:
|
|
from datetime import timedelta
|
|
# Note: Implement when event calendar service is ready
|
|
# For now, initialize as empty
|
|
context['event_calendar'] = []
|
|
logger.info("Event calendar: not yet implemented, using empty list")
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch events: {e}")
|
|
context['event_calendar'] = []
|
|
|
|
# NEW: Placeholder for traffic predictions (Phase 5)
|
|
# Note: Implement traffic forecasting in Phase 5
|
|
# For now, initialize as empty dict (will be converted to DataFrame when implemented)
|
|
context['traffic_predictions'] = {}
|
|
logger.info("Traffic predictions: not yet implemented, using empty dict")
|
|
|
|
logger.info(
|
|
f"Shared data snapshot fetched successfully: "
|
|
f"{len(inventory_data)} ingredients, "
|
|
f"{len(suppliers_data)} suppliers, "
|
|
f"{len(recipes_data)} recipes, "
|
|
f"{len(context.get('event_calendar', []))} events"
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'inventory_count': len(inventory_data) if inventory_data else 0,
|
|
'suppliers_count': len(suppliers_data) if suppliers_data else 0,
|
|
'recipes_count': len(recipes_data) if recipes_data else 0,
|
|
'events_count': len(context.get('event_calendar', []))
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch shared data snapshot for tenant {tenant_id}: {e}")
|
|
raise
|
|
|
|
# ========================================================================
|
|
# Step 0.5: Generate AI Insights (NEW)
|
|
# ========================================================================
|
|
|
|
async def _generate_ai_insights(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate AI insights using HTTP calls to ML insights endpoints.
|
|
|
|
This step runs multiple ML insight generators in parallel via HTTP:
|
|
- Dynamic forecasting rules learning (forecasting service)
|
|
- Safety stock optimization (inventory service)
|
|
- Production yield predictions (production service)
|
|
- Supplier performance analysis (procurement service)
|
|
- Price forecasting (procurement service)
|
|
|
|
All insights are posted to the AI Insights Service by the respective services
|
|
and can be consumed by downstream orchestration steps.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context with cached data snapshots
|
|
|
|
Returns:
|
|
Dictionary with insights generation results
|
|
"""
|
|
logger.info(f"Generating AI insights for tenant {tenant_id} via HTTP endpoints")
|
|
|
|
insights_results = {
|
|
'total_insights_generated': 0,
|
|
'total_insights_posted': 0,
|
|
'insights_by_source': {},
|
|
'errors': []
|
|
}
|
|
|
|
try:
|
|
# Prepare async tasks for parallel HTTP calls
|
|
ml_tasks = []
|
|
|
|
# Task 1: Safety Stock Optimization (inventory service)
|
|
async def trigger_safety_stock_optimization():
|
|
try:
|
|
result = await self.inventory_client.trigger_safety_stock_optimization(
|
|
tenant_id=tenant_id,
|
|
product_ids=None, # Analyze all products
|
|
lookback_days=90,
|
|
min_history_days=30
|
|
)
|
|
if result and result.get('success'):
|
|
return ('safety_stock', {
|
|
'insights_posted': result.get('total_insights_posted', 0),
|
|
'insights_generated': result.get('total_insights_generated', 0),
|
|
'products_optimized': result.get('products_optimized', 0)
|
|
})
|
|
else:
|
|
return ('safety_stock', {'error': result.get('message', 'Unknown error') if result else 'Service returned None', 'insights_posted': 0})
|
|
except Exception as e:
|
|
logger.error(f"Safety stock optimization failed: {e}")
|
|
return ('safety_stock', {'error': str(e), 'insights_posted': 0})
|
|
|
|
ml_tasks.append(trigger_safety_stock_optimization())
|
|
|
|
# Task 2: Production Yield Analysis (production service)
|
|
async def trigger_yield_prediction():
|
|
try:
|
|
result = await self.production_client.trigger_yield_prediction(
|
|
tenant_id=tenant_id,
|
|
recipe_ids=None, # Analyze all recipes
|
|
lookback_days=90,
|
|
min_history_runs=30
|
|
)
|
|
if result and result.get('success'):
|
|
return ('yield_analysis', {
|
|
'insights_posted': result.get('total_insights_posted', 0),
|
|
'insights_generated': result.get('total_insights_generated', 0),
|
|
'recipes_analyzed': result.get('recipes_analyzed', 0)
|
|
})
|
|
else:
|
|
return ('yield_analysis', {'error': result.get('message', 'Unknown error') if result else 'Service returned None', 'insights_posted': 0})
|
|
except Exception as e:
|
|
logger.error(f"Yield prediction failed: {e}")
|
|
return ('yield_analysis', {'error': str(e), 'insights_posted': 0})
|
|
|
|
ml_tasks.append(trigger_yield_prediction())
|
|
|
|
# Task 3: Supplier Performance Analysis (procurement service)
|
|
async def trigger_supplier_analysis():
|
|
try:
|
|
result = await self.procurement_client.trigger_supplier_analysis(
|
|
tenant_id=tenant_id,
|
|
supplier_ids=None, # Analyze all suppliers
|
|
lookback_days=180,
|
|
min_orders=10
|
|
)
|
|
if result and result.get('success'):
|
|
return ('supplier_analysis', {
|
|
'insights_posted': result.get('total_insights_posted', 0),
|
|
'insights_generated': result.get('total_insights_generated', 0),
|
|
'suppliers_analyzed': result.get('suppliers_analyzed', 0)
|
|
})
|
|
else:
|
|
return ('supplier_analysis', {'error': result.get('message', 'Unknown error') if result else 'Service returned None', 'insights_posted': 0})
|
|
except Exception as e:
|
|
logger.error(f"Supplier analysis failed: {e}")
|
|
return ('supplier_analysis', {'error': str(e), 'insights_posted': 0})
|
|
|
|
ml_tasks.append(trigger_supplier_analysis())
|
|
|
|
# Task 4: Price Forecasting (procurement service)
|
|
async def trigger_price_forecasting():
|
|
try:
|
|
result = await self.procurement_client.trigger_price_forecasting(
|
|
tenant_id=tenant_id,
|
|
ingredient_ids=None, # Forecast all ingredients
|
|
lookback_days=180,
|
|
forecast_horizon_days=30
|
|
)
|
|
if result and result.get('success'):
|
|
return ('price_forecast', {
|
|
'insights_posted': result.get('total_insights_posted', 0),
|
|
'insights_generated': result.get('total_insights_generated', 0),
|
|
'ingredients_forecasted': result.get('ingredients_forecasted', 0),
|
|
'buy_now_recommendations': result.get('buy_now_recommendations', 0)
|
|
})
|
|
else:
|
|
return ('price_forecast', {'error': result.get('message', 'Unknown error') if result else 'Service returned None', 'insights_posted': 0})
|
|
except Exception as e:
|
|
logger.error(f"Price forecasting failed: {e}")
|
|
return ('price_forecast', {'error': str(e), 'insights_posted': 0})
|
|
|
|
ml_tasks.append(trigger_price_forecasting())
|
|
|
|
# Task 5: Dynamic Rules Learning (forecasting service)
|
|
async def trigger_rules_generation():
|
|
try:
|
|
result = await self.forecast_client.trigger_rules_generation(
|
|
tenant_id=tenant_id,
|
|
product_ids=None, # Analyze all products
|
|
lookback_days=90,
|
|
min_samples=10
|
|
)
|
|
if result and result.get('success'):
|
|
return ('rules_learning', {
|
|
'insights_posted': result.get('total_insights_posted', 0),
|
|
'insights_generated': result.get('total_insights_generated', 0),
|
|
'products_analyzed': result.get('products_analyzed', 0)
|
|
})
|
|
else:
|
|
return ('rules_learning', {'error': result.get('message', 'Unknown error') if result else 'Service returned None', 'insights_posted': 0})
|
|
except Exception as e:
|
|
logger.error(f"Rules generation failed: {e}")
|
|
return ('rules_learning', {'error': str(e), 'insights_posted': 0})
|
|
|
|
ml_tasks.append(trigger_rules_generation())
|
|
|
|
# Run all ML insight generation tasks in parallel
|
|
logger.info(f"Triggering {len(ml_tasks)} ML insight endpoints in parallel")
|
|
results = await asyncio.gather(*ml_tasks, return_exceptions=True)
|
|
|
|
# Process results
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
logger.error(f"ML insight task failed with exception: {result}")
|
|
insights_results['errors'].append(str(result))
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
source, data = result
|
|
if 'error' in data:
|
|
insights_results['errors'].append(f"{source}: {data['error']}")
|
|
else:
|
|
posted = data.get('insights_posted', 0)
|
|
generated = data.get('insights_generated', posted)
|
|
insights_results['total_insights_posted'] += posted
|
|
insights_results['total_insights_generated'] += generated
|
|
insights_results['insights_by_source'][source] = posted
|
|
logger.info(f"{source}: {posted} insights posted")
|
|
|
|
# Store insights count and errors in context
|
|
context['ai_insights_generated'] = insights_results['total_insights_generated']
|
|
context['ai_insights_posted'] = insights_results['total_insights_posted']
|
|
context['ai_insights_errors'] = insights_results['errors']
|
|
|
|
logger.info(
|
|
f"AI insights generation complete: "
|
|
f"{insights_results['total_insights_posted']} insights posted from "
|
|
f"{len(insights_results['insights_by_source'])} sources"
|
|
)
|
|
|
|
return insights_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate AI insights for tenant {tenant_id}: {e}", exc_info=True)
|
|
# Don't fail the orchestration if insights generation fails
|
|
# Log error and continue
|
|
insights_results['errors'].append(str(e))
|
|
context['ai_insights_generated'] = 0
|
|
context['ai_insights_posted'] = 0
|
|
context['ai_insights_errors'] = insights_results['errors']
|
|
return insights_results
|
|
|
|
# ========================================================================
|
|
# Step 1: Generate Forecasts
|
|
# ========================================================================
|
|
|
|
async def _generate_forecasts(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate forecasts for tenant.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Forecast result
|
|
"""
|
|
logger.info(f"Generating forecasts for tenant {tenant_id}")
|
|
|
|
try:
|
|
# Call forecast service with circuit breaker protection
|
|
if self.forecast_breaker:
|
|
result = await self.forecast_breaker.call(
|
|
self.forecast_client.generate_forecasts, tenant_id
|
|
)
|
|
else:
|
|
result = await self.forecast_client.generate_forecasts(tenant_id)
|
|
|
|
if not result:
|
|
logger.error(f"Forecast service returned None for tenant {tenant_id}")
|
|
raise Exception("Forecast service returned None")
|
|
|
|
# Store forecast ID in context
|
|
forecast_id = result.get('forecast_id') or result.get('id')
|
|
context['forecast_id'] = forecast_id
|
|
context['forecast_data'] = result
|
|
|
|
logger.info(
|
|
f"Forecasts generated successfully: {forecast_id}, "
|
|
f"{result.get('forecasts_created', 0)} forecasts created"
|
|
)
|
|
|
|
# Ensure tenant_id is in result for compensation
|
|
result['tenant_id'] = tenant_id
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate forecasts for tenant {tenant_id}: {e}")
|
|
raise
|
|
|
|
async def _compensate_forecasts(self, forecast_result: Dict[str, Any]):
|
|
"""
|
|
Compensate forecast generation (delete generated forecasts).
|
|
|
|
Args:
|
|
forecast_result: Result from forecast generation
|
|
"""
|
|
forecast_id = forecast_result.get('forecast_id') or forecast_result.get('id')
|
|
|
|
if not forecast_id:
|
|
logger.warning("No forecast ID to compensate")
|
|
return
|
|
|
|
logger.info(f"Compensating forecasts: {forecast_id}")
|
|
|
|
try:
|
|
# Call forecast service to delete the forecast
|
|
tenant_id = forecast_result.get('tenant_id')
|
|
if tenant_id:
|
|
await self.forecast_client.delete_forecast(tenant_id, forecast_id)
|
|
logger.info(f"Successfully deleted forecast {forecast_id} (compensation)")
|
|
else:
|
|
logger.warning(f"Cannot compensate forecast {forecast_id}: no tenant_id in result")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to compensate forecasts {forecast_id}: {e}")
|
|
|
|
# ========================================================================
|
|
# Step 2: Generate Production Schedule
|
|
# ========================================================================
|
|
|
|
async def _generate_production_schedule(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate production schedule for tenant.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Production schedule result
|
|
"""
|
|
logger.info(f"Generating production schedule for tenant {tenant_id}")
|
|
|
|
forecast_data = context.get('forecast_data', {})
|
|
inventory_snapshot = context.get('inventory_snapshot', {})
|
|
recipes_snapshot = context.get('recipes_snapshot', {})
|
|
|
|
try:
|
|
# Call production service with cached data and circuit breaker protection
|
|
if self.production_breaker:
|
|
result = await self.production_breaker.call(
|
|
self.production_client.generate_schedule,
|
|
tenant_id=tenant_id,
|
|
forecast_data=forecast_data,
|
|
inventory_data=inventory_snapshot,
|
|
recipes_data=recipes_snapshot
|
|
)
|
|
else:
|
|
result = await self.production_client.generate_schedule(
|
|
tenant_id=tenant_id,
|
|
forecast_data=forecast_data,
|
|
inventory_data=inventory_snapshot,
|
|
recipes_data=recipes_snapshot
|
|
)
|
|
|
|
if not result:
|
|
logger.error(f"Production service returned None for tenant {tenant_id}")
|
|
raise Exception("Production service returned None")
|
|
|
|
# Store schedule ID in context
|
|
schedule_id = result.get('schedule_id') or result.get('id')
|
|
context['production_schedule_id'] = schedule_id
|
|
context['production_data'] = result
|
|
|
|
logger.info(
|
|
f"Production schedule generated successfully: {schedule_id}, "
|
|
f"{result.get('batches_created', 0)} batches created"
|
|
)
|
|
|
|
# Ensure tenant_id is in result for compensation
|
|
result['tenant_id'] = tenant_id
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to generate production schedule for tenant {tenant_id}: {e}"
|
|
)
|
|
raise
|
|
|
|
async def _compensate_production_schedule(
|
|
self,
|
|
production_result: Dict[str, Any]
|
|
):
|
|
"""
|
|
Compensate production schedule (delete schedule).
|
|
|
|
Args:
|
|
production_result: Result from production generation
|
|
"""
|
|
schedule_id = production_result.get('schedule_id') or production_result.get('id')
|
|
|
|
if not schedule_id:
|
|
logger.warning("No production schedule ID to compensate")
|
|
return
|
|
|
|
logger.info(f"Compensating production schedule: {schedule_id}")
|
|
|
|
try:
|
|
# Call production service to delete the schedule
|
|
tenant_id = production_result.get('tenant_id')
|
|
if tenant_id:
|
|
await self.production_client.delete_schedule(tenant_id, schedule_id)
|
|
logger.info(
|
|
f"Successfully deleted production schedule {schedule_id} (compensation)"
|
|
)
|
|
else:
|
|
logger.warning(f"Cannot compensate schedule {schedule_id}: no tenant_id in result")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to compensate production schedule {schedule_id}: {e}"
|
|
)
|
|
|
|
# ========================================================================
|
|
# Step 3: Generate Procurement Plan
|
|
# ========================================================================
|
|
|
|
async def _generate_procurement_plan(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate procurement plan for tenant.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Procurement plan result
|
|
"""
|
|
logger.info(f"Generating procurement plan for tenant {tenant_id}")
|
|
|
|
forecast_data = context.get('forecast_data', {})
|
|
production_schedule_id = context.get('production_schedule_id')
|
|
inventory_snapshot = context.get('inventory_snapshot', {})
|
|
suppliers_snapshot = context.get('suppliers_snapshot', {})
|
|
recipes_snapshot = context.get('recipes_snapshot', {})
|
|
|
|
try:
|
|
# Call procurement service with cached data and circuit breaker protection
|
|
if self.procurement_breaker:
|
|
result = await self.procurement_breaker.call(
|
|
self.procurement_client.auto_generate_procurement,
|
|
tenant_id=tenant_id,
|
|
forecast_data=forecast_data,
|
|
production_schedule_id=production_schedule_id,
|
|
inventory_data=inventory_snapshot,
|
|
suppliers_data=suppliers_snapshot,
|
|
recipes_data=recipes_snapshot
|
|
)
|
|
else:
|
|
result = await self.procurement_client.auto_generate_procurement(
|
|
tenant_id=tenant_id,
|
|
forecast_data=forecast_data,
|
|
production_schedule_id=production_schedule_id,
|
|
inventory_data=inventory_snapshot,
|
|
suppliers_data=suppliers_snapshot,
|
|
recipes_data=recipes_snapshot
|
|
)
|
|
|
|
if not result:
|
|
logger.error(f"Procurement service returned None for tenant {tenant_id}")
|
|
raise Exception("Procurement service returned None")
|
|
|
|
# Store plan ID in context
|
|
plan_id = result.get('plan_id') or result.get('id')
|
|
context['procurement_plan_id'] = plan_id
|
|
context['procurement_data'] = result
|
|
|
|
logger.info(
|
|
f"Procurement plan generated successfully: {plan_id}, "
|
|
f"{result.get('requirements_created', 0)} requirements, "
|
|
f"{result.get('pos_created', 0)} purchase orders created"
|
|
)
|
|
|
|
# Ensure tenant_id is in result for compensation
|
|
result['tenant_id'] = tenant_id
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to generate procurement plan for tenant {tenant_id}: {e}"
|
|
)
|
|
raise
|
|
|
|
async def _compensate_procurement_plan(
|
|
self,
|
|
procurement_result: Dict[str, Any]
|
|
):
|
|
"""
|
|
Compensate procurement plan (delete plan and POs).
|
|
|
|
Args:
|
|
procurement_result: Result from procurement generation
|
|
"""
|
|
plan_id = procurement_result.get('plan_id') or procurement_result.get('id')
|
|
|
|
if not plan_id:
|
|
logger.warning("No procurement plan ID to compensate")
|
|
return
|
|
|
|
logger.info(f"Compensating procurement plan: {plan_id}")
|
|
|
|
try:
|
|
# Call procurement service to delete plan (this should cascade delete requirements and POs)
|
|
tenant_id = procurement_result.get('tenant_id')
|
|
if tenant_id:
|
|
await self.procurement_client.delete_plan(tenant_id, plan_id)
|
|
logger.info(
|
|
f"Successfully deleted procurement plan {plan_id} and associated POs (compensation)"
|
|
)
|
|
else:
|
|
logger.warning(f"Cannot compensate plan {plan_id}: no tenant_id in result")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to compensate procurement plan {plan_id}: {e}")
|
|
|
|
# ========================================================================
|
|
# Step 4: Send Notifications
|
|
# ========================================================================
|
|
|
|
async def _send_notifications(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Send workflow completion notifications.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Notification result
|
|
"""
|
|
logger.info(f"Sending notifications for tenant {tenant_id}")
|
|
|
|
try:
|
|
# Prepare notification data
|
|
notification_data = {
|
|
'tenant_id': tenant_id,
|
|
'orchestration_run_id': context.get('orchestration_run_id'),
|
|
'forecast_id': context.get('forecast_id'),
|
|
'production_schedule_id': context.get('production_schedule_id'),
|
|
'procurement_plan_id': context.get('procurement_plan_id'),
|
|
'forecasts_created': context.get('forecast_data', {}).get('forecasts_created', 0),
|
|
'batches_created': context.get('production_data', {}).get('batches_created', 0),
|
|
'requirements_created': context.get('procurement_data', {}).get('requirements_created', 0),
|
|
'pos_created': context.get('procurement_data', {}).get('pos_created', 0)
|
|
}
|
|
|
|
# Call notification service
|
|
result = await self.notification_client.send_workflow_summary(
|
|
tenant_id=tenant_id,
|
|
notification_data=notification_data
|
|
)
|
|
|
|
if result:
|
|
notifications_sent = result.get('notifications_sent', 0)
|
|
context['notifications_sent'] = notifications_sent
|
|
|
|
logger.info(f"Notifications sent successfully: {notifications_sent}")
|
|
|
|
return result
|
|
else:
|
|
logger.warning(f"Notification service returned None for tenant {tenant_id}")
|
|
return {'notifications_sent': 0, 'error': 'Notification service returned None'}
|
|
|
|
except Exception as e:
|
|
# Log error but don't fail the saga for notification failures
|
|
logger.error(
|
|
f"NOTIFICATION FAILURE: Failed to send notifications for tenant {tenant_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
# Store failure information in context
|
|
context['notification_failed'] = True
|
|
context['notification_error'] = str(e)
|
|
# Return empty result instead of raising
|
|
return {'notifications_sent': 0, 'error': str(e), 'failed': True}
|
|
|
|
# ========================================================================
|
|
# Step 5: Validate Previous Day's Forecasts
|
|
# ========================================================================
|
|
|
|
async def _validate_previous_forecasts(
|
|
self,
|
|
tenant_id: str,
|
|
context: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Validate yesterday's forecasts against actual sales.
|
|
Calculate accuracy metrics (MAPE, RMSE, MAE) and trigger retraining if needed.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
context: Execution context
|
|
|
|
Returns:
|
|
Validation result with metrics
|
|
"""
|
|
from datetime import date, timedelta
|
|
|
|
logger.info(f"Validating previous day's forecasts for tenant {tenant_id}")
|
|
|
|
try:
|
|
yesterday = date.today() - timedelta(days=1)
|
|
|
|
# Call forecasting service validation endpoint
|
|
validation_result = await self.forecast_client.validate_forecasts(
|
|
tenant_id=tenant_id,
|
|
date=yesterday
|
|
)
|
|
|
|
if not validation_result:
|
|
logger.warning(f"No validation results returned for tenant {tenant_id}")
|
|
return {'validated': False, 'reason': 'no_data'}
|
|
|
|
# Extract metrics
|
|
overall_mape = validation_result.get('overall_mape', 0)
|
|
overall_rmse = validation_result.get('overall_rmse', 0)
|
|
overall_mae = validation_result.get('overall_mae', 0)
|
|
products_validated = validation_result.get('products_validated', 0)
|
|
poor_accuracy_products = validation_result.get('poor_accuracy_products', [])
|
|
|
|
context['validation_metrics'] = {
|
|
'mape': overall_mape,
|
|
'rmse': overall_rmse,
|
|
'mae': overall_mae,
|
|
'products_validated': products_validated,
|
|
'validation_date': yesterday.isoformat()
|
|
}
|
|
|
|
logger.info(
|
|
f"Validation complete for tenant {tenant_id}: "
|
|
f"MAPE={overall_mape:.2f}%, RMSE={overall_rmse:.2f}, MAE={overall_mae:.2f}, "
|
|
f"Products={products_validated}"
|
|
)
|
|
|
|
# Post accuracy insights to AI Insights Service
|
|
try:
|
|
from uuid import UUID
|
|
from datetime import datetime
|
|
await self.ai_insights_client.post_accuracy_metrics(
|
|
tenant_id=UUID(tenant_id),
|
|
validation_date=datetime.combine(yesterday, datetime.min.time()),
|
|
metrics={
|
|
'overall_mape': overall_mape,
|
|
'overall_rmse': overall_rmse,
|
|
'overall_mae': overall_mae,
|
|
'products_validated': products_validated,
|
|
'poor_accuracy_products': poor_accuracy_products
|
|
}
|
|
)
|
|
logger.info(f"Posted accuracy metrics to AI Insights Service")
|
|
except Exception as e:
|
|
logger.warning(f"Could not post accuracy metrics to AI Insights: {e}")
|
|
|
|
# Trigger retraining for products with poor accuracy
|
|
if poor_accuracy_products and len(poor_accuracy_products) > 0:
|
|
logger.warning(
|
|
f"Found {len(poor_accuracy_products)} products with MAPE > 30%, "
|
|
f"triggering retraining"
|
|
)
|
|
|
|
retraining_triggered = 0
|
|
|
|
# Check if training client is available
|
|
if not self.training_client:
|
|
logger.warning(
|
|
f"Training client not available, cannot trigger retraining for "
|
|
f"{len(poor_accuracy_products)} products"
|
|
)
|
|
context['retraining_triggered'] = 0
|
|
else:
|
|
for product_data in poor_accuracy_products:
|
|
product_id = product_data.get('product_id')
|
|
product_mape = product_data.get('mape', 0)
|
|
|
|
if not product_id:
|
|
continue
|
|
|
|
try:
|
|
await self.training_client.trigger_retrain(
|
|
tenant_id=tenant_id,
|
|
inventory_product_id=product_id,
|
|
reason='accuracy_degradation',
|
|
metadata={
|
|
'previous_mape': product_mape,
|
|
'validation_date': yesterday.isoformat(),
|
|
'triggered_by': 'orchestration_validation'
|
|
}
|
|
)
|
|
retraining_triggered += 1
|
|
logger.info(
|
|
f"Triggered retraining for product {product_id} "
|
|
f"(MAPE={product_mape:.2f}%)"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to trigger retraining for product {product_id}: {e}"
|
|
)
|
|
|
|
context['retraining_triggered'] = retraining_triggered
|
|
logger.info(f"Triggered retraining for {retraining_triggered} products")
|
|
else:
|
|
logger.info("All products have acceptable accuracy (MAPE <= 30%)")
|
|
context['retraining_triggered'] = 0
|
|
|
|
return {
|
|
'validated': True,
|
|
'metrics': context['validation_metrics'],
|
|
'retraining_triggered': context.get('retraining_triggered', 0)
|
|
}
|
|
|
|
except Exception as e:
|
|
# Don't fail the saga if validation fails, but log prominently
|
|
logger.error(
|
|
f"VALIDATION FAILURE: Forecast validation failed for tenant {tenant_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
# Store failure information in context
|
|
context['validation_failed'] = True
|
|
context['validation_error'] = str(e)
|
|
return {
|
|
'validated': False,
|
|
'error': str(e),
|
|
'retraining_triggered': 0,
|
|
'failed': True
|
|
}
|
|
|
|
# ========================================================================
|
|
# Utility Methods
|
|
# ========================================================================
|
|
|
|
async def execute_with_timeout(
|
|
self,
|
|
tenant_id: str,
|
|
orchestration_run_id: str,
|
|
timeout_seconds: int = 600
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute orchestration with timeout.
|
|
|
|
Args:
|
|
tenant_id: Tenant ID
|
|
orchestration_run_id: Orchestration run ID
|
|
timeout_seconds: Timeout in seconds
|
|
|
|
Returns:
|
|
Execution result
|
|
"""
|
|
try:
|
|
result = await asyncio.wait_for(
|
|
self.execute_orchestration(tenant_id, orchestration_run_id),
|
|
timeout=timeout_seconds
|
|
)
|
|
return result
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.error(
|
|
f"Orchestration timed out after {timeout_seconds}s for tenant {tenant_id}"
|
|
)
|
|
return {
|
|
'success': False,
|
|
'error': f'Orchestration timed out after {timeout_seconds} seconds',
|
|
'timeout': True
|
|
}
|