Add forecasting demand insights trigger + fix RabbitMQ cleanup
Issue 1: Forecasting demand insights not triggered in demo workflow - Created internal ML endpoint: /forecasting/internal/ml/generate-demand-insights - Added trigger_demand_insights_internal() to ForecastServiceClient - Integrated forecasting insights into demo session post-clone workflow - Now triggers 4 AI insight types: price, safety stock, yield, + demand Issue 2: RabbitMQ client cleanup error in procurement service - Fixed: rabbitmq_client.close() → rabbitmq_client.disconnect() - Added proper cleanup in exception handler - Error: "'RabbitMQClient' object has no attribute 'close'" Files modified: - services/forecasting/app/api/ml_insights.py (new internal_router) - services/forecasting/app/main.py (register internal router) - shared/clients/forecast_client.py (new trigger method) - services/demo_session/app/services/clone_orchestrator.py (+ demand insights) - services/procurement/app/api/internal_demo.py (fix disconnect) Expected impact: - Demo sessions will now generate demand forecasting insights - No more RabbitMQ cleanup errors in logs - AI insights count should increase from 1 to 2-3 per session 🤖 Generated with Claude Code Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -968,6 +968,8 @@ class CloneOrchestrator:
|
||||
inventory_client = InventoryServiceClient(config, calling_service_name="demo-session")
|
||||
production_client = ProductionServiceClient(config, calling_service_name="demo-session")
|
||||
procurement_client = ProcurementServiceClient(config, service_name="demo-session")
|
||||
from shared.clients.forecast_client import ForecastServiceClient
|
||||
forecasting_client = ForecastServiceClient(config, calling_service_name="demo-session")
|
||||
|
||||
# For professional/enterprise demos, trigger all AI insights
|
||||
if demo_account_type in ["professional", "enterprise"]:
|
||||
@@ -1026,6 +1028,24 @@ class CloneOrchestrator:
|
||||
logger.error("Failed to trigger yield insights", tenant_id=virtual_tenant_id, error=str(e))
|
||||
results["yield_insights"] = {"error": str(e)}
|
||||
|
||||
# 4. Trigger demand forecasting insights
|
||||
try:
|
||||
logger.info("Triggering demand forecasting insights", tenant_id=virtual_tenant_id)
|
||||
result = await forecasting_client.trigger_demand_insights_internal(virtual_tenant_id)
|
||||
if result:
|
||||
results["demand_insights"] = result
|
||||
total_insights += result.get("insights_posted", 0)
|
||||
logger.info(
|
||||
"Demand insights generated",
|
||||
tenant_id=virtual_tenant_id,
|
||||
insights_posted=result.get("insights_posted", 0)
|
||||
)
|
||||
else:
|
||||
results["demand_insights"] = {"error": "No response from service"}
|
||||
except Exception as e:
|
||||
logger.error("Failed to trigger demand insights", tenant_id=virtual_tenant_id, error=str(e))
|
||||
results["demand_insights"] = {"error": str(e)}
|
||||
|
||||
# Wait 2s for insights to be processed
|
||||
await asyncio.sleep(2.0)
|
||||
|
||||
|
||||
@@ -767,3 +767,172 @@ async def ml_insights_health():
|
||||
"POST /ml/insights/analyze-business-rules"
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
# ================================================================
|
||||
# INTERNAL ML INSIGHTS ENDPOINTS (for demo session service)
|
||||
# ================================================================
|
||||
|
||||
internal_router = APIRouter(tags=["Internal ML"])
|
||||
|
||||
|
||||
@internal_router.post("/api/v1/tenants/{tenant_id}/forecasting/internal/ml/generate-demand-insights")
|
||||
async def trigger_demand_insights_internal(
|
||||
tenant_id: str,
|
||||
request: Request,
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Internal endpoint to trigger demand forecasting insights for a tenant.
|
||||
|
||||
This endpoint is called by the demo-session service after cloning to generate
|
||||
AI insights from the seeded forecast data.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant UUID
|
||||
request: FastAPI request object to access app state
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
Dict with insights generation results
|
||||
"""
|
||||
logger.info(
|
||||
"Internal demand insights generation triggered",
|
||||
tenant_id=tenant_id
|
||||
)
|
||||
|
||||
try:
|
||||
# Import ML orchestrator and clients
|
||||
from app.ml.demand_insights_orchestrator import DemandInsightsOrchestrator
|
||||
from shared.clients.sales_client import SalesServiceClient
|
||||
from shared.clients.inventory_client import InventoryServiceClient
|
||||
from app.core.config import settings
|
||||
|
||||
# Get event publisher from app state
|
||||
event_publisher = getattr(request.app.state, 'event_publisher', None)
|
||||
|
||||
# Initialize orchestrator and clients
|
||||
orchestrator = DemandInsightsOrchestrator(event_publisher=event_publisher)
|
||||
inventory_client = InventoryServiceClient(settings)
|
||||
|
||||
# Get all products for tenant (limit to 10 for performance)
|
||||
all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id)
|
||||
products = all_products[:10] if all_products else []
|
||||
|
||||
logger.info(
|
||||
"Retrieved products from inventory service",
|
||||
tenant_id=tenant_id,
|
||||
product_count=len(products)
|
||||
)
|
||||
|
||||
if not products:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "No products found for analysis",
|
||||
"tenant_id": tenant_id,
|
||||
"products_analyzed": 0,
|
||||
"insights_posted": 0
|
||||
}
|
||||
|
||||
# Initialize sales client
|
||||
sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting")
|
||||
|
||||
# Calculate date range (90 days lookback)
|
||||
end_date = datetime.utcnow()
|
||||
start_date = end_date - timedelta(days=90)
|
||||
|
||||
# Process each product
|
||||
total_insights_generated = 0
|
||||
total_insights_posted = 0
|
||||
|
||||
for product in products:
|
||||
try:
|
||||
product_id = str(product['id'])
|
||||
product_name = product.get('name', 'Unknown Product')
|
||||
|
||||
logger.debug(
|
||||
"Analyzing demand for product",
|
||||
tenant_id=tenant_id,
|
||||
product_id=product_id,
|
||||
product_name=product_name
|
||||
)
|
||||
|
||||
# Fetch historical sales data
|
||||
sales_data_raw = await sales_client.get_product_sales(
|
||||
tenant_id=tenant_id,
|
||||
product_id=product_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date
|
||||
)
|
||||
|
||||
if not sales_data_raw or len(sales_data_raw) < 10:
|
||||
logger.debug(
|
||||
"Insufficient sales data for product",
|
||||
product_id=product_id,
|
||||
sales_records=len(sales_data_raw) if sales_data_raw else 0
|
||||
)
|
||||
continue
|
||||
|
||||
# Convert to DataFrame
|
||||
sales_df = pd.DataFrame(sales_data_raw)
|
||||
|
||||
# Run demand insights orchestrator
|
||||
insights = await orchestrator.analyze_and_generate_insights(
|
||||
tenant_id=tenant_id,
|
||||
product_id=product_id,
|
||||
product_name=product_name,
|
||||
sales_data=sales_df,
|
||||
lookback_days=90,
|
||||
db=db
|
||||
)
|
||||
|
||||
if insights:
|
||||
total_insights_generated += len(insights)
|
||||
total_insights_posted += len(insights)
|
||||
|
||||
logger.info(
|
||||
"Demand insights generated for product",
|
||||
tenant_id=tenant_id,
|
||||
product_id=product_id,
|
||||
insights_count=len(insights)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to analyze product demand (non-fatal)",
|
||||
tenant_id=tenant_id,
|
||||
product_id=product_id,
|
||||
error=str(e)
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"Internal demand insights generation complete",
|
||||
tenant_id=tenant_id,
|
||||
products_analyzed=len(products),
|
||||
insights_generated=total_insights_generated,
|
||||
insights_posted=total_insights_posted
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Generated {total_insights_posted} demand forecasting insights",
|
||||
"tenant_id": tenant_id,
|
||||
"products_analyzed": len(products),
|
||||
"insights_posted": total_insights_posted
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Internal demand insights generation failed",
|
||||
tenant_id=tenant_id,
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
)
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Demand insights generation failed: {str(e)}",
|
||||
"tenant_id": tenant_id,
|
||||
"products_analyzed": 0,
|
||||
"insights_posted": 0
|
||||
}
|
||||
|
||||
@@ -193,6 +193,7 @@ service.add_router(analytics.router)
|
||||
service.add_router(scenario_operations.router)
|
||||
service.add_router(internal_demo.router, tags=["internal-demo"])
|
||||
service.add_router(ml_insights.router) # ML insights endpoint
|
||||
service.add_router(ml_insights.internal_router) # Internal ML insights endpoint
|
||||
service.add_router(validation.router) # Validation endpoint
|
||||
service.add_router(historical_validation.router) # Historical validation endpoint
|
||||
service.add_router(webhooks.router) # Webhooks endpoint
|
||||
|
||||
@@ -171,7 +171,7 @@ async def _emit_po_approval_alerts_for_demo(
|
||||
# Continue with other POs
|
||||
|
||||
# Close RabbitMQ connection
|
||||
await rabbitmq_client.close()
|
||||
await rabbitmq_client.disconnect()
|
||||
|
||||
logger.info(
|
||||
"PO approval alerts emission completed",
|
||||
@@ -188,7 +188,12 @@ async def _emit_po_approval_alerts_for_demo(
|
||||
virtual_tenant_id=str(virtual_tenant_id),
|
||||
exc_info=True
|
||||
)
|
||||
# Don't fail the cloning process
|
||||
# Don't fail the cloning process - ensure we try to disconnect if connected
|
||||
try:
|
||||
if 'rabbitmq_client' in locals():
|
||||
await rabbitmq_client.disconnect()
|
||||
except:
|
||||
pass # Suppress cleanup errors
|
||||
return alerts_emitted
|
||||
|
||||
|
||||
|
||||
@@ -341,6 +341,53 @@ class ForecastServiceClient(BaseServiceClient):
|
||||
}
|
||||
return await self.post("forecasting/ml/insights/generate-rules", data=data, tenant_id=tenant_id)
|
||||
|
||||
async def trigger_demand_insights_internal(
|
||||
self,
|
||||
tenant_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Trigger demand forecasting insights for a tenant (internal service use only).
|
||||
|
||||
This method calls the internal endpoint which is protected by X-Internal-Service header.
|
||||
Used by demo-session service after cloning to generate AI insights from seeded data.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID to trigger insights for
|
||||
|
||||
Returns:
|
||||
Dict with trigger results or None if failed
|
||||
"""
|
||||
try:
|
||||
result = await self._make_request(
|
||||
method="POST",
|
||||
endpoint=f"forecasting/internal/ml/generate-demand-insights",
|
||||
tenant_id=tenant_id,
|
||||
data={"tenant_id": tenant_id},
|
||||
headers={"X-Internal-Service": "demo-session"}
|
||||
)
|
||||
|
||||
if result:
|
||||
self.logger.info(
|
||||
"Demand insights triggered successfully via internal endpoint",
|
||||
tenant_id=tenant_id,
|
||||
insights_posted=result.get("insights_posted", 0)
|
||||
)
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Demand insights internal endpoint returned no result",
|
||||
tenant_id=tenant_id
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
"Error triggering demand insights via internal endpoint",
|
||||
tenant_id=tenant_id,
|
||||
error=str(e)
|
||||
)
|
||||
return None
|
||||
|
||||
# ================================================================
|
||||
# Legacy/Compatibility Methods (deprecated)
|
||||
# ================================================================
|
||||
|
||||
Reference in New Issue
Block a user