Merge pull request #4 from ualsweb/claude/audit-orchestration-scheduler-011CUpnzhnQBA2aqEg24omEb
Fix all critical orchestration scheduler issues and add improvements
This commit is contained in:
@@ -39,7 +39,7 @@ class OrchestratorSettings(BaseServiceSettings):
|
||||
|
||||
# Orchestration Settings
|
||||
ORCHESTRATION_ENABLED: bool = os.getenv("ORCHESTRATION_ENABLED", "true").lower() == "true"
|
||||
ORCHESTRATION_SCHEDULE: str = os.getenv("ORCHESTRATION_SCHEDULE", "0 5 * * *") # 5:30 AM daily (cron format)
|
||||
ORCHESTRATION_SCHEDULE: str = os.getenv("ORCHESTRATION_SCHEDULE", "30 5 * * *") # 5:30 AM daily (cron format)
|
||||
ORCHESTRATION_TIMEOUT_SECONDS: int = int(os.getenv("ORCHESTRATION_TIMEOUT_SECONDS", "600")) # 10 minutes
|
||||
|
||||
# Tenant Processing
|
||||
|
||||
@@ -65,6 +65,14 @@ class OrchestrationRun(Base):
|
||||
notification_status = Column(String(20), nullable=True) # success, failed, skipped
|
||||
notification_error = Column(Text, nullable=True)
|
||||
|
||||
# AI Insights tracking
|
||||
ai_insights_started_at = Column(DateTime(timezone=True), nullable=True)
|
||||
ai_insights_completed_at = Column(DateTime(timezone=True), nullable=True)
|
||||
ai_insights_status = Column(String(20), nullable=True) # success, failed, skipped
|
||||
ai_insights_error = Column(Text, nullable=True)
|
||||
ai_insights_generated = Column(Integer, nullable=False, default=0)
|
||||
ai_insights_posted = Column(Integer, nullable=False, default=0)
|
||||
|
||||
# Results summary
|
||||
forecasts_generated = Column(Integer, nullable=False, default=0)
|
||||
production_batches_created = Column(Integer, nullable=False, default=0)
|
||||
@@ -82,9 +90,14 @@ class OrchestrationRun(Base):
|
||||
error_details = Column(JSONB, nullable=True)
|
||||
|
||||
# External references
|
||||
forecast_id = Column(UUID(as_uuid=True), nullable=True)
|
||||
production_schedule_id = Column(UUID(as_uuid=True), nullable=True)
|
||||
procurement_plan_id = Column(UUID(as_uuid=True), nullable=True)
|
||||
|
||||
# Saga tracking
|
||||
saga_steps_total = Column(Integer, nullable=False, default=0)
|
||||
saga_steps_completed = Column(Integer, nullable=False, default=0)
|
||||
|
||||
# Audit fields
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
|
||||
|
||||
@@ -6,7 +6,7 @@ Orchestration Run Repository - Database operations for orchestration audit trail
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import datetime, date
|
||||
from datetime import datetime, date, timezone
|
||||
from typing import List, Optional, Dict, Any
|
||||
from sqlalchemy import select, and_, desc, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
@@ -43,7 +43,7 @@ class OrchestrationRunRepository:
|
||||
if hasattr(run, key):
|
||||
setattr(run, key, value)
|
||||
|
||||
run.updated_at = datetime.utcnow()
|
||||
run.updated_at = datetime.now(timezone.utc)
|
||||
await self.db.flush()
|
||||
return run
|
||||
|
||||
@@ -92,18 +92,36 @@ class OrchestrationRunRepository:
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def generate_run_number(self) -> str:
|
||||
"""Generate unique run number"""
|
||||
"""
|
||||
Generate unique run number atomically using database-level counting.
|
||||
|
||||
Uses MAX(run_number) + 1 approach to avoid race conditions
|
||||
between reading count and inserting new record.
|
||||
"""
|
||||
today = date.today()
|
||||
date_str = today.strftime("%Y%m%d")
|
||||
|
||||
# Count existing runs for today
|
||||
stmt = select(func.count(OrchestrationRun.id)).where(
|
||||
func.date(OrchestrationRun.started_at) == today
|
||||
# Get the highest run number for today atomically
|
||||
# Using MAX on run_number suffix to avoid counting which has race conditions
|
||||
stmt = select(func.max(OrchestrationRun.run_number)).where(
|
||||
OrchestrationRun.run_number.like(f"ORCH-{date_str}-%")
|
||||
)
|
||||
result = await self.db.execute(stmt)
|
||||
count = result.scalar() or 0
|
||||
max_run_number = result.scalar()
|
||||
|
||||
return f"ORCH-{date_str}-{count + 1:04d}"
|
||||
if max_run_number:
|
||||
# Extract the numeric suffix and increment it
|
||||
try:
|
||||
suffix = int(max_run_number.split('-')[-1])
|
||||
next_number = suffix + 1
|
||||
except (ValueError, IndexError):
|
||||
# Fallback to 1 if parsing fails
|
||||
next_number = 1
|
||||
else:
|
||||
# No runs for today yet
|
||||
next_number = 1
|
||||
|
||||
return f"ORCH-{date_str}-{next_number:04d}"
|
||||
|
||||
async def get_failed_runs(self, limit: int = 10) -> List[OrchestrationRun]:
|
||||
"""Get recent failed orchestration runs"""
|
||||
|
||||
@@ -7,7 +7,7 @@ Integrates AI-enhanced orchestration when enabled.
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, Any, Optional
|
||||
import logging
|
||||
|
||||
@@ -53,7 +53,14 @@ class OrchestrationSaga:
|
||||
training_client: Optional[TrainingServiceClient] = None,
|
||||
use_ai_enhancement: bool = False,
|
||||
ai_insights_base_url: str = "http://ai-insights-service:8000",
|
||||
ai_insights_min_confidence: int = 70
|
||||
ai_insights_min_confidence: int = 70,
|
||||
# Circuit breakers for fault tolerance
|
||||
forecast_breaker: Optional['CircuitBreaker'] = None,
|
||||
production_breaker: Optional['CircuitBreaker'] = None,
|
||||
procurement_breaker: Optional['CircuitBreaker'] = None,
|
||||
inventory_breaker: Optional['CircuitBreaker'] = None,
|
||||
suppliers_breaker: Optional['CircuitBreaker'] = None,
|
||||
recipes_breaker: Optional['CircuitBreaker'] = None
|
||||
):
|
||||
"""
|
||||
Initialize orchestration saga.
|
||||
@@ -80,11 +87,20 @@ class OrchestrationSaga:
|
||||
self.suppliers_client = suppliers_client
|
||||
self.recipes_client = recipes_client
|
||||
self.ai_insights_client = ai_insights_client or AIInsightsClient(
|
||||
base_url=ai_insights_base_url
|
||||
base_url=ai_insights_base_url,
|
||||
calling_service_name="orchestrator-service"
|
||||
)
|
||||
self.training_client = training_client
|
||||
self.use_ai_enhancement = use_ai_enhancement
|
||||
|
||||
# Circuit breakers
|
||||
self.forecast_breaker = forecast_breaker
|
||||
self.production_breaker = production_breaker
|
||||
self.procurement_breaker = procurement_breaker
|
||||
self.inventory_breaker = inventory_breaker
|
||||
self.suppliers_breaker = suppliers_breaker
|
||||
self.recipes_breaker = recipes_breaker
|
||||
|
||||
# Initialize AI enhancer if enabled
|
||||
self.ai_enhancer = None
|
||||
if use_ai_enhancement:
|
||||
@@ -202,6 +218,12 @@ class OrchestrationSaga:
|
||||
'production_schedule_id': context.get('production_schedule_id'),
|
||||
'procurement_plan_id': context.get('procurement_plan_id'),
|
||||
'notifications_sent': context.get('notifications_sent', 0),
|
||||
'forecast_data': context.get('forecast_data', {}),
|
||||
'production_data': context.get('production_data', {}),
|
||||
'procurement_data': context.get('procurement_data', {}),
|
||||
'ai_insights_generated': context.get('ai_insights_generated', 0),
|
||||
'ai_insights_posted': context.get('ai_insights_posted', 0),
|
||||
'ai_insights_errors': context.get('ai_insights_errors', []),
|
||||
'saga_summary': saga.get_execution_summary()
|
||||
}
|
||||
else:
|
||||
@@ -237,48 +259,77 @@ class OrchestrationSaga:
|
||||
logger.info(f"Fetching shared data snapshot for tenant {tenant_id}")
|
||||
|
||||
try:
|
||||
# Fetch data in parallel for optimal performance
|
||||
inventory_task = self.inventory_client.get_all_ingredients(tenant_id, is_active=True)
|
||||
suppliers_task = self.suppliers_client.get_all_suppliers(tenant_id, is_active=True)
|
||||
recipes_task = self.recipes_client.get_all_recipes(tenant_id, is_active=True)
|
||||
# Fetch data in parallel for optimal performance, with circuit breaker protection
|
||||
async def fetch_inventory():
|
||||
if self.inventory_breaker:
|
||||
return await self.inventory_breaker.call(
|
||||
self.inventory_client.get_all_ingredients, tenant_id, is_active=True
|
||||
)
|
||||
else:
|
||||
return await self.inventory_client.get_all_ingredients(tenant_id, is_active=True)
|
||||
|
||||
async def fetch_suppliers():
|
||||
if self.suppliers_breaker:
|
||||
return await self.suppliers_breaker.call(
|
||||
self.suppliers_client.get_all_suppliers, tenant_id, is_active=True
|
||||
)
|
||||
else:
|
||||
return await self.suppliers_client.get_all_suppliers(tenant_id, is_active=True)
|
||||
|
||||
async def fetch_recipes():
|
||||
if self.recipes_breaker:
|
||||
return await self.recipes_breaker.call(
|
||||
self.recipes_client.get_all_recipes, tenant_id, is_active=True
|
||||
)
|
||||
else:
|
||||
return await self.recipes_client.get_all_recipes(tenant_id, is_active=True)
|
||||
|
||||
# Wait for all data to be fetched
|
||||
inventory_data, suppliers_data, recipes_data = await asyncio.gather(
|
||||
inventory_task,
|
||||
suppliers_task,
|
||||
recipes_task,
|
||||
fetch_inventory(),
|
||||
fetch_suppliers(),
|
||||
fetch_recipes(),
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
# Handle errors for each fetch
|
||||
failures = 0
|
||||
if isinstance(inventory_data, Exception):
|
||||
logger.error(f"Failed to fetch inventory data: {inventory_data}")
|
||||
inventory_data = []
|
||||
failures += 1
|
||||
|
||||
if isinstance(suppliers_data, Exception):
|
||||
logger.error(f"Failed to fetch suppliers data: {suppliers_data}")
|
||||
suppliers_data = []
|
||||
failures += 1
|
||||
|
||||
if isinstance(recipes_data, Exception):
|
||||
logger.error(f"Failed to fetch recipes data: {recipes_data}")
|
||||
recipes_data = []
|
||||
failures += 1
|
||||
|
||||
# If all three fetches failed, treat it as a critical failure
|
||||
if failures >= 3:
|
||||
logger.error(f"All shared data fetches failed for tenant {tenant_id}")
|
||||
raise Exception("Unable to fetch any shared data (inventory, suppliers, recipes)")
|
||||
|
||||
# Store in context for downstream services
|
||||
context['inventory_snapshot'] = {
|
||||
'ingredients': inventory_data,
|
||||
'fetched_at': datetime.utcnow().isoformat(),
|
||||
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
||||
'count': len(inventory_data) if inventory_data else 0
|
||||
}
|
||||
|
||||
context['suppliers_snapshot'] = {
|
||||
'suppliers': suppliers_data,
|
||||
'fetched_at': datetime.utcnow().isoformat(),
|
||||
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
||||
'count': len(suppliers_data) if suppliers_data else 0
|
||||
}
|
||||
|
||||
context['recipes_snapshot'] = {
|
||||
'recipes': recipes_data,
|
||||
'fetched_at': datetime.utcnow().isoformat(),
|
||||
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
||||
'count': len(recipes_data) if recipes_data else 0
|
||||
}
|
||||
|
||||
@@ -504,9 +555,10 @@ class OrchestrationSaga:
|
||||
insights_results['insights_by_source'][source] = posted
|
||||
logger.info(f"{source}: {posted} insights posted")
|
||||
|
||||
# Store insights count in context
|
||||
# Store insights count and errors in context
|
||||
context['ai_insights_generated'] = insights_results['total_insights_generated']
|
||||
context['ai_insights_posted'] = insights_results['total_insights_posted']
|
||||
context['ai_insights_errors'] = insights_results['errors']
|
||||
|
||||
logger.info(
|
||||
f"AI insights generation complete: "
|
||||
@@ -523,6 +575,7 @@ class OrchestrationSaga:
|
||||
insights_results['errors'].append(str(e))
|
||||
context['ai_insights_generated'] = 0
|
||||
context['ai_insights_posted'] = 0
|
||||
context['ai_insights_errors'] = insights_results['errors']
|
||||
return insights_results
|
||||
|
||||
# ========================================================================
|
||||
@@ -547,8 +600,13 @@ class OrchestrationSaga:
|
||||
logger.info(f"Generating forecasts for tenant {tenant_id}")
|
||||
|
||||
try:
|
||||
# Call forecast service
|
||||
result = await self.forecast_client.generate_forecasts(tenant_id)
|
||||
# Call forecast service with circuit breaker protection
|
||||
if self.forecast_breaker:
|
||||
result = await self.forecast_breaker.call(
|
||||
self.forecast_client.generate_forecasts, tenant_id
|
||||
)
|
||||
else:
|
||||
result = await self.forecast_client.generate_forecasts(tenant_id)
|
||||
|
||||
if not result:
|
||||
logger.error(f"Forecast service returned None for tenant {tenant_id}")
|
||||
@@ -564,6 +622,8 @@ class OrchestrationSaga:
|
||||
f"{result.get('forecasts_created', 0)} forecasts created"
|
||||
)
|
||||
|
||||
# Ensure tenant_id is in result for compensation
|
||||
result['tenant_id'] = tenant_id
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -586,9 +646,13 @@ class OrchestrationSaga:
|
||||
logger.info(f"Compensating forecasts: {forecast_id}")
|
||||
|
||||
try:
|
||||
# In a real implementation, call forecast service to delete
|
||||
# For now, just log
|
||||
logger.info(f"Forecast {forecast_id} would be deleted (compensation)")
|
||||
# Call forecast service to delete the forecast
|
||||
tenant_id = forecast_result.get('tenant_id')
|
||||
if tenant_id:
|
||||
await self.forecast_client.delete_forecast(tenant_id, forecast_id)
|
||||
logger.info(f"Successfully deleted forecast {forecast_id} (compensation)")
|
||||
else:
|
||||
logger.warning(f"Cannot compensate forecast {forecast_id}: no tenant_id in result")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to compensate forecasts {forecast_id}: {e}")
|
||||
@@ -619,13 +683,22 @@ class OrchestrationSaga:
|
||||
recipes_snapshot = context.get('recipes_snapshot', {})
|
||||
|
||||
try:
|
||||
# Call production service with cached data (NEW)
|
||||
result = await self.production_client.generate_schedule(
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
inventory_data=inventory_snapshot, # NEW: Pass cached inventory
|
||||
recipes_data=recipes_snapshot # NEW: Pass cached recipes
|
||||
)
|
||||
# Call production service with cached data and circuit breaker protection
|
||||
if self.production_breaker:
|
||||
result = await self.production_breaker.call(
|
||||
self.production_client.generate_schedule,
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
inventory_data=inventory_snapshot,
|
||||
recipes_data=recipes_snapshot
|
||||
)
|
||||
else:
|
||||
result = await self.production_client.generate_schedule(
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
inventory_data=inventory_snapshot,
|
||||
recipes_data=recipes_snapshot
|
||||
)
|
||||
|
||||
if not result:
|
||||
logger.error(f"Production service returned None for tenant {tenant_id}")
|
||||
@@ -641,6 +714,8 @@ class OrchestrationSaga:
|
||||
f"{result.get('batches_created', 0)} batches created"
|
||||
)
|
||||
|
||||
# Ensure tenant_id is in result for compensation
|
||||
result['tenant_id'] = tenant_id
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -668,11 +743,15 @@ class OrchestrationSaga:
|
||||
logger.info(f"Compensating production schedule: {schedule_id}")
|
||||
|
||||
try:
|
||||
# In a real implementation, call production service to delete
|
||||
# For now, just log
|
||||
logger.info(
|
||||
f"Production schedule {schedule_id} would be deleted (compensation)"
|
||||
)
|
||||
# Call production service to delete the schedule
|
||||
tenant_id = production_result.get('tenant_id')
|
||||
if tenant_id:
|
||||
await self.production_client.delete_schedule(tenant_id, schedule_id)
|
||||
logger.info(
|
||||
f"Successfully deleted production schedule {schedule_id} (compensation)"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Cannot compensate schedule {schedule_id}: no tenant_id in result")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
@@ -707,15 +786,26 @@ class OrchestrationSaga:
|
||||
recipes_snapshot = context.get('recipes_snapshot', {})
|
||||
|
||||
try:
|
||||
# Call procurement service with cached data (NEW)
|
||||
result = await self.procurement_client.auto_generate_procurement(
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
production_schedule_id=production_schedule_id,
|
||||
inventory_data=inventory_snapshot, # NEW: Pass cached inventory
|
||||
suppliers_data=suppliers_snapshot, # NEW: Pass cached suppliers
|
||||
recipes_data=recipes_snapshot # NEW: Pass cached recipes
|
||||
)
|
||||
# Call procurement service with cached data and circuit breaker protection
|
||||
if self.procurement_breaker:
|
||||
result = await self.procurement_breaker.call(
|
||||
self.procurement_client.auto_generate_procurement,
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
production_schedule_id=production_schedule_id,
|
||||
inventory_data=inventory_snapshot,
|
||||
suppliers_data=suppliers_snapshot,
|
||||
recipes_data=recipes_snapshot
|
||||
)
|
||||
else:
|
||||
result = await self.procurement_client.auto_generate_procurement(
|
||||
tenant_id=tenant_id,
|
||||
forecast_data=forecast_data,
|
||||
production_schedule_id=production_schedule_id,
|
||||
inventory_data=inventory_snapshot,
|
||||
suppliers_data=suppliers_snapshot,
|
||||
recipes_data=recipes_snapshot
|
||||
)
|
||||
|
||||
if not result:
|
||||
logger.error(f"Procurement service returned None for tenant {tenant_id}")
|
||||
@@ -732,6 +822,8 @@ class OrchestrationSaga:
|
||||
f"{result.get('pos_created', 0)} purchase orders created"
|
||||
)
|
||||
|
||||
# Ensure tenant_id is in result for compensation
|
||||
result['tenant_id'] = tenant_id
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -759,11 +851,15 @@ class OrchestrationSaga:
|
||||
logger.info(f"Compensating procurement plan: {plan_id}")
|
||||
|
||||
try:
|
||||
# In a real implementation, call procurement service to delete plan
|
||||
# This should also cascade delete requirements and POs
|
||||
logger.info(
|
||||
f"Procurement plan {plan_id} would be deleted (compensation)"
|
||||
)
|
||||
# Call procurement service to delete plan (this should cascade delete requirements and POs)
|
||||
tenant_id = procurement_result.get('tenant_id')
|
||||
if tenant_id:
|
||||
await self.procurement_client.delete_plan(tenant_id, plan_id)
|
||||
logger.info(
|
||||
f"Successfully deleted procurement plan {plan_id} and associated POs (compensation)"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Cannot compensate plan {plan_id}: no tenant_id in result")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to compensate procurement plan {plan_id}: {e}")
|
||||
@@ -822,9 +918,15 @@ class OrchestrationSaga:
|
||||
|
||||
except Exception as e:
|
||||
# Log error but don't fail the saga for notification failures
|
||||
logger.error(f"Failed to send notifications for tenant {tenant_id}: {e}")
|
||||
logger.error(
|
||||
f"NOTIFICATION FAILURE: Failed to send notifications for tenant {tenant_id}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
# Store failure information in context
|
||||
context['notification_failed'] = True
|
||||
context['notification_error'] = str(e)
|
||||
# Return empty result instead of raising
|
||||
return {'notifications_sent': 0, 'error': str(e)}
|
||||
return {'notifications_sent': 0, 'error': str(e), 'failed': True}
|
||||
|
||||
# ========================================================================
|
||||
# Step 5: Validate Previous Day's Forecasts
|
||||
@@ -911,36 +1013,45 @@ class OrchestrationSaga:
|
||||
)
|
||||
|
||||
retraining_triggered = 0
|
||||
for product_data in poor_accuracy_products:
|
||||
product_id = product_data.get('product_id')
|
||||
product_mape = product_data.get('mape', 0)
|
||||
|
||||
if not product_id:
|
||||
continue
|
||||
# Check if training client is available
|
||||
if not self.training_client:
|
||||
logger.warning(
|
||||
f"Training client not available, cannot trigger retraining for "
|
||||
f"{len(poor_accuracy_products)} products"
|
||||
)
|
||||
context['retraining_triggered'] = 0
|
||||
else:
|
||||
for product_data in poor_accuracy_products:
|
||||
product_id = product_data.get('product_id')
|
||||
product_mape = product_data.get('mape', 0)
|
||||
|
||||
try:
|
||||
await self.training_client.trigger_retrain(
|
||||
tenant_id=tenant_id,
|
||||
inventory_product_id=product_id,
|
||||
reason='accuracy_degradation',
|
||||
metadata={
|
||||
'previous_mape': product_mape,
|
||||
'validation_date': yesterday.isoformat(),
|
||||
'triggered_by': 'orchestration_validation'
|
||||
}
|
||||
)
|
||||
retraining_triggered += 1
|
||||
logger.info(
|
||||
f"Triggered retraining for product {product_id} "
|
||||
f"(MAPE={product_mape:.2f}%)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to trigger retraining for product {product_id}: {e}"
|
||||
)
|
||||
if not product_id:
|
||||
continue
|
||||
|
||||
context['retraining_triggered'] = retraining_triggered
|
||||
logger.info(f"Triggered retraining for {retraining_triggered} products")
|
||||
try:
|
||||
await self.training_client.trigger_retrain(
|
||||
tenant_id=tenant_id,
|
||||
inventory_product_id=product_id,
|
||||
reason='accuracy_degradation',
|
||||
metadata={
|
||||
'previous_mape': product_mape,
|
||||
'validation_date': yesterday.isoformat(),
|
||||
'triggered_by': 'orchestration_validation'
|
||||
}
|
||||
)
|
||||
retraining_triggered += 1
|
||||
logger.info(
|
||||
f"Triggered retraining for product {product_id} "
|
||||
f"(MAPE={product_mape:.2f}%)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to trigger retraining for product {product_id}: {e}"
|
||||
)
|
||||
|
||||
context['retraining_triggered'] = retraining_triggered
|
||||
logger.info(f"Triggered retraining for {retraining_triggered} products")
|
||||
else:
|
||||
logger.info("All products have acceptable accuracy (MAPE <= 30%)")
|
||||
context['retraining_triggered'] = 0
|
||||
@@ -952,12 +1063,19 @@ class OrchestrationSaga:
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
# Don't fail the saga if validation fails
|
||||
logger.warning(f"Forecast validation failed for tenant {tenant_id}: {e}")
|
||||
# Don't fail the saga if validation fails, but log prominently
|
||||
logger.error(
|
||||
f"VALIDATION FAILURE: Forecast validation failed for tenant {tenant_id}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
# Store failure information in context
|
||||
context['validation_failed'] = True
|
||||
context['validation_error'] = str(e)
|
||||
return {
|
||||
'validated': False,
|
||||
'error': str(e),
|
||||
'retraining_triggered': 0
|
||||
'retraining_triggered': 0,
|
||||
'failed': True
|
||||
}
|
||||
|
||||
# ========================================================================
|
||||
|
||||
@@ -82,6 +82,21 @@ class OrchestratorSchedulerService(BaseAlertService):
|
||||
timeout_duration=30,
|
||||
success_threshold=2
|
||||
)
|
||||
self.inventory_breaker = CircuitBreaker(
|
||||
failure_threshold=5,
|
||||
timeout_duration=60,
|
||||
success_threshold=2
|
||||
)
|
||||
self.suppliers_breaker = CircuitBreaker(
|
||||
failure_threshold=5,
|
||||
timeout_duration=60,
|
||||
success_threshold=2
|
||||
)
|
||||
self.recipes_breaker = CircuitBreaker(
|
||||
failure_threshold=5,
|
||||
timeout_duration=60,
|
||||
success_threshold=2
|
||||
)
|
||||
|
||||
def setup_scheduled_checks(self):
|
||||
"""
|
||||
@@ -204,7 +219,14 @@ class OrchestratorSchedulerService(BaseAlertService):
|
||||
training_client=self.training_client,
|
||||
use_ai_enhancement=settings.ORCHESTRATION_USE_AI_INSIGHTS,
|
||||
ai_insights_base_url=settings.AI_INSIGHTS_SERVICE_URL,
|
||||
ai_insights_min_confidence=settings.AI_INSIGHTS_MIN_CONFIDENCE
|
||||
ai_insights_min_confidence=settings.AI_INSIGHTS_MIN_CONFIDENCE,
|
||||
# Pass circuit breakers to saga for fault tolerance
|
||||
forecast_breaker=self.forecast_breaker,
|
||||
production_breaker=self.production_breaker,
|
||||
procurement_breaker=self.procurement_breaker,
|
||||
inventory_breaker=self.inventory_breaker,
|
||||
suppliers_breaker=self.suppliers_breaker,
|
||||
recipes_breaker=self.recipes_breaker
|
||||
)
|
||||
|
||||
result = await saga.execute_orchestration(
|
||||
@@ -316,6 +338,20 @@ class OrchestratorSchedulerService(BaseAlertService):
|
||||
total_steps = saga_summary.get('total_steps', 0)
|
||||
completed_steps = saga_summary.get('completed_steps', 0)
|
||||
|
||||
# Extract actual counts from saga result (no placeholders)
|
||||
forecast_data = saga_result.get('forecast_data', {})
|
||||
production_data = saga_result.get('production_data', {})
|
||||
procurement_data = saga_result.get('procurement_data', {})
|
||||
|
||||
forecasts_generated = forecast_data.get('forecasts_created', 0)
|
||||
production_batches_created = production_data.get('batches_created', 0)
|
||||
purchase_orders_created = procurement_data.get('pos_created', 0)
|
||||
|
||||
# Extract AI insights tracking
|
||||
ai_insights_generated = saga_result.get('ai_insights_generated', 0)
|
||||
ai_insights_posted = saga_result.get('ai_insights_posted', 0)
|
||||
ai_insights_errors = saga_result.get('ai_insights_errors', [])
|
||||
|
||||
await repo.update_run(run_id, {
|
||||
'status': OrchestrationStatus.completed,
|
||||
'completed_at': completed_at,
|
||||
@@ -323,19 +359,23 @@ class OrchestratorSchedulerService(BaseAlertService):
|
||||
'forecast_id': forecast_id,
|
||||
'forecasting_status': 'success',
|
||||
'forecasting_completed_at': completed_at,
|
||||
'forecasts_generated': 1, # Placeholder
|
||||
'forecasts_generated': forecasts_generated,
|
||||
'production_schedule_id': production_schedule_id,
|
||||
'production_status': 'success',
|
||||
'production_completed_at': completed_at,
|
||||
'production_batches_created': 0, # Placeholder
|
||||
'production_batches_created': production_batches_created,
|
||||
'procurement_plan_id': procurement_plan_id,
|
||||
'procurement_status': 'success',
|
||||
'procurement_completed_at': completed_at,
|
||||
'procurement_plans_created': 1,
|
||||
'purchase_orders_created': 0, # Placeholder
|
||||
'procurement_plans_created': 1, # Always 1 plan per orchestration
|
||||
'purchase_orders_created': purchase_orders_created,
|
||||
'notification_status': 'success',
|
||||
'notification_completed_at': completed_at,
|
||||
'notifications_sent': notifications_sent,
|
||||
'ai_insights_status': 'success' if not ai_insights_errors else 'partial',
|
||||
'ai_insights_generated': ai_insights_generated,
|
||||
'ai_insights_posted': ai_insights_posted,
|
||||
'ai_insights_completed_at': completed_at,
|
||||
'saga_steps_total': total_steps,
|
||||
'saga_steps_completed': completed_steps
|
||||
})
|
||||
@@ -395,5 +435,8 @@ class OrchestratorSchedulerService(BaseAlertService):
|
||||
'forecast_service': self.forecast_breaker.get_stats(),
|
||||
'production_service': self.production_breaker.get_stats(),
|
||||
'procurement_service': self.procurement_breaker.get_stats(),
|
||||
'tenant_service': self.tenant_breaker.get_stats()
|
||||
'tenant_service': self.tenant_breaker.get_stats(),
|
||||
'inventory_service': self.inventory_breaker.get_stats(),
|
||||
'suppliers_service': self.suppliers_breaker.get_stats(),
|
||||
'recipes_service': self.recipes_breaker.get_stats()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
"""Add AI insights tracking and indexes
|
||||
|
||||
Revision ID: 20251105_add_ai_insights
|
||||
Revises: 20251029_1700_add_orchestration_runs
|
||||
Create Date: 2025-11-05 12:00:00.000000
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '20251105_add_ai_insights'
|
||||
down_revision = '20251029_1700_add_orchestration_runs'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
"""Add AI insights tracking columns, saga tracking, and performance indexes"""
|
||||
|
||||
# Add AI Insights tracking columns
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_started_at', sa.DateTime(timezone=True), nullable=True))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_completed_at', sa.DateTime(timezone=True), nullable=True))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_status', sa.String(20), nullable=True))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_error', sa.Text(), nullable=True))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_generated', sa.Integer(), nullable=False, server_default='0'))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('ai_insights_posted', sa.Integer(), nullable=False, server_default='0'))
|
||||
|
||||
# Add forecast_id reference (was missing)
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('forecast_id', postgresql.UUID(as_uuid=True), nullable=True))
|
||||
|
||||
# Add saga tracking columns
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('saga_steps_total', sa.Integer(), nullable=False, server_default='0'))
|
||||
op.add_column('orchestration_runs',
|
||||
sa.Column('saga_steps_completed', sa.Integer(), nullable=False, server_default='0'))
|
||||
|
||||
# Add performance indexes
|
||||
# Index for querying by tenant_id and date range
|
||||
op.create_index(
|
||||
'ix_orchestration_runs_tenant_started',
|
||||
'orchestration_runs',
|
||||
['tenant_id', 'started_at'],
|
||||
unique=False
|
||||
)
|
||||
|
||||
# Index for querying by status and date
|
||||
op.create_index(
|
||||
'ix_orchestration_runs_status_started',
|
||||
'orchestration_runs',
|
||||
['status', 'started_at'],
|
||||
unique=False
|
||||
)
|
||||
|
||||
# Index for run number lookups (already unique, but add explicit index for performance)
|
||||
# run_number already has index from unique constraint, so this is redundant
|
||||
# op.create_index('ix_orchestration_runs_run_number', 'orchestration_runs', ['run_number'], unique=False)
|
||||
|
||||
|
||||
def downgrade():
|
||||
"""Remove AI insights tracking columns, saga tracking, and indexes"""
|
||||
|
||||
# Remove indexes
|
||||
op.drop_index('ix_orchestration_runs_status_started', table_name='orchestration_runs')
|
||||
op.drop_index('ix_orchestration_runs_tenant_started', table_name='orchestration_runs')
|
||||
|
||||
# Remove saga tracking columns
|
||||
op.drop_column('orchestration_runs', 'saga_steps_completed')
|
||||
op.drop_column('orchestration_runs', 'saga_steps_total')
|
||||
|
||||
# Remove forecast_id reference
|
||||
op.drop_column('orchestration_runs', 'forecast_id')
|
||||
|
||||
# Remove AI insights tracking columns
|
||||
op.drop_column('orchestration_runs', 'ai_insights_posted')
|
||||
op.drop_column('orchestration_runs', 'ai_insights_generated')
|
||||
op.drop_column('orchestration_runs', 'ai_insights_error')
|
||||
op.drop_column('orchestration_runs', 'ai_insights_status')
|
||||
op.drop_column('orchestration_runs', 'ai_insights_completed_at')
|
||||
op.drop_column('orchestration_runs', 'ai_insights_started_at')
|
||||
Reference in New Issue
Block a user