From 4418ff08768078e58b9be15961d912d49f249899 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Tue, 16 Dec 2025 11:28:04 +0100 Subject: [PATCH] Add forecasting demand insights trigger + fix RabbitMQ cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue 1: Forecasting demand insights not triggered in demo workflow - Created internal ML endpoint: /forecasting/internal/ml/generate-demand-insights - Added trigger_demand_insights_internal() to ForecastServiceClient - Integrated forecasting insights into demo session post-clone workflow - Now triggers 4 AI insight types: price, safety stock, yield, + demand Issue 2: RabbitMQ client cleanup error in procurement service - Fixed: rabbitmq_client.close() → rabbitmq_client.disconnect() - Added proper cleanup in exception handler - Error: "'RabbitMQClient' object has no attribute 'close'" Files modified: - services/forecasting/app/api/ml_insights.py (new internal_router) - services/forecasting/app/main.py (register internal router) - shared/clients/forecast_client.py (new trigger method) - services/demo_session/app/services/clone_orchestrator.py (+ demand insights) - services/procurement/app/api/internal_demo.py (fix disconnect) Expected impact: - Demo sessions will now generate demand forecasting insights - No more RabbitMQ cleanup errors in logs - AI insights count should increase from 1 to 2-3 per session 🤖 Generated with Claude Code Co-Authored-By: Claude Sonnet 4.5 --- .../app/services/clone_orchestrator.py | 20 +++ services/forecasting/app/api/ml_insights.py | 169 ++++++++++++++++++ services/forecasting/app/main.py | 1 + services/procurement/app/api/internal_demo.py | 9 +- shared/clients/forecast_client.py | 47 +++++ 5 files changed, 244 insertions(+), 2 deletions(-) diff --git a/services/demo_session/app/services/clone_orchestrator.py b/services/demo_session/app/services/clone_orchestrator.py index a82c80c5..d8a67369 100644 --- a/services/demo_session/app/services/clone_orchestrator.py +++ b/services/demo_session/app/services/clone_orchestrator.py @@ -968,6 +968,8 @@ class CloneOrchestrator: inventory_client = InventoryServiceClient(config, calling_service_name="demo-session") production_client = ProductionServiceClient(config, calling_service_name="demo-session") procurement_client = ProcurementServiceClient(config, service_name="demo-session") + from shared.clients.forecast_client import ForecastServiceClient + forecasting_client = ForecastServiceClient(config, calling_service_name="demo-session") # For professional/enterprise demos, trigger all AI insights if demo_account_type in ["professional", "enterprise"]: @@ -1026,6 +1028,24 @@ class CloneOrchestrator: logger.error("Failed to trigger yield insights", tenant_id=virtual_tenant_id, error=str(e)) results["yield_insights"] = {"error": str(e)} + # 4. Trigger demand forecasting insights + try: + logger.info("Triggering demand forecasting insights", tenant_id=virtual_tenant_id) + result = await forecasting_client.trigger_demand_insights_internal(virtual_tenant_id) + if result: + results["demand_insights"] = result + total_insights += result.get("insights_posted", 0) + logger.info( + "Demand insights generated", + tenant_id=virtual_tenant_id, + insights_posted=result.get("insights_posted", 0) + ) + else: + results["demand_insights"] = {"error": "No response from service"} + except Exception as e: + logger.error("Failed to trigger demand insights", tenant_id=virtual_tenant_id, error=str(e)) + results["demand_insights"] = {"error": str(e)} + # Wait 2s for insights to be processed await asyncio.sleep(2.0) diff --git a/services/forecasting/app/api/ml_insights.py b/services/forecasting/app/api/ml_insights.py index 076cd94a..d201ad87 100644 --- a/services/forecasting/app/api/ml_insights.py +++ b/services/forecasting/app/api/ml_insights.py @@ -767,3 +767,172 @@ async def ml_insights_health(): "POST /ml/insights/analyze-business-rules" ] } + + +# ================================================================ +# INTERNAL ML INSIGHTS ENDPOINTS (for demo session service) +# ================================================================ + +internal_router = APIRouter(tags=["Internal ML"]) + + +@internal_router.post("/api/v1/tenants/{tenant_id}/forecasting/internal/ml/generate-demand-insights") +async def trigger_demand_insights_internal( + tenant_id: str, + request: Request, + db: AsyncSession = Depends(get_db) +): + """ + Internal endpoint to trigger demand forecasting insights for a tenant. + + This endpoint is called by the demo-session service after cloning to generate + AI insights from the seeded forecast data. + + Args: + tenant_id: Tenant UUID + request: FastAPI request object to access app state + db: Database session + + Returns: + Dict with insights generation results + """ + logger.info( + "Internal demand insights generation triggered", + tenant_id=tenant_id + ) + + try: + # Import ML orchestrator and clients + from app.ml.demand_insights_orchestrator import DemandInsightsOrchestrator + from shared.clients.sales_client import SalesServiceClient + from shared.clients.inventory_client import InventoryServiceClient + from app.core.config import settings + + # Get event publisher from app state + event_publisher = getattr(request.app.state, 'event_publisher', None) + + # Initialize orchestrator and clients + orchestrator = DemandInsightsOrchestrator(event_publisher=event_publisher) + inventory_client = InventoryServiceClient(settings) + + # Get all products for tenant (limit to 10 for performance) + all_products = await inventory_client.get_all_ingredients(tenant_id=tenant_id) + products = all_products[:10] if all_products else [] + + logger.info( + "Retrieved products from inventory service", + tenant_id=tenant_id, + product_count=len(products) + ) + + if not products: + return { + "success": False, + "message": "No products found for analysis", + "tenant_id": tenant_id, + "products_analyzed": 0, + "insights_posted": 0 + } + + # Initialize sales client + sales_client = SalesServiceClient(config=settings, calling_service_name="forecasting") + + # Calculate date range (90 days lookback) + end_date = datetime.utcnow() + start_date = end_date - timedelta(days=90) + + # Process each product + total_insights_generated = 0 + total_insights_posted = 0 + + for product in products: + try: + product_id = str(product['id']) + product_name = product.get('name', 'Unknown Product') + + logger.debug( + "Analyzing demand for product", + tenant_id=tenant_id, + product_id=product_id, + product_name=product_name + ) + + # Fetch historical sales data + sales_data_raw = await sales_client.get_product_sales( + tenant_id=tenant_id, + product_id=product_id, + start_date=start_date, + end_date=end_date + ) + + if not sales_data_raw or len(sales_data_raw) < 10: + logger.debug( + "Insufficient sales data for product", + product_id=product_id, + sales_records=len(sales_data_raw) if sales_data_raw else 0 + ) + continue + + # Convert to DataFrame + sales_df = pd.DataFrame(sales_data_raw) + + # Run demand insights orchestrator + insights = await orchestrator.analyze_and_generate_insights( + tenant_id=tenant_id, + product_id=product_id, + product_name=product_name, + sales_data=sales_df, + lookback_days=90, + db=db + ) + + if insights: + total_insights_generated += len(insights) + total_insights_posted += len(insights) + + logger.info( + "Demand insights generated for product", + tenant_id=tenant_id, + product_id=product_id, + insights_count=len(insights) + ) + + except Exception as e: + logger.warning( + "Failed to analyze product demand (non-fatal)", + tenant_id=tenant_id, + product_id=product_id, + error=str(e) + ) + continue + + logger.info( + "Internal demand insights generation complete", + tenant_id=tenant_id, + products_analyzed=len(products), + insights_generated=total_insights_generated, + insights_posted=total_insights_posted + ) + + return { + "success": True, + "message": f"Generated {total_insights_posted} demand forecasting insights", + "tenant_id": tenant_id, + "products_analyzed": len(products), + "insights_posted": total_insights_posted + } + + except Exception as e: + logger.error( + "Internal demand insights generation failed", + tenant_id=tenant_id, + error=str(e), + exc_info=True + ) + return { + "success": False, + "message": f"Demand insights generation failed: {str(e)}", + "tenant_id": tenant_id, + "products_analyzed": 0, + "insights_posted": 0 + } diff --git a/services/forecasting/app/main.py b/services/forecasting/app/main.py index d7a25306..20b735ae 100644 --- a/services/forecasting/app/main.py +++ b/services/forecasting/app/main.py @@ -193,6 +193,7 @@ service.add_router(analytics.router) service.add_router(scenario_operations.router) service.add_router(internal_demo.router, tags=["internal-demo"]) service.add_router(ml_insights.router) # ML insights endpoint +service.add_router(ml_insights.internal_router) # Internal ML insights endpoint service.add_router(validation.router) # Validation endpoint service.add_router(historical_validation.router) # Historical validation endpoint service.add_router(webhooks.router) # Webhooks endpoint diff --git a/services/procurement/app/api/internal_demo.py b/services/procurement/app/api/internal_demo.py index e6f97fe7..426177bb 100644 --- a/services/procurement/app/api/internal_demo.py +++ b/services/procurement/app/api/internal_demo.py @@ -171,7 +171,7 @@ async def _emit_po_approval_alerts_for_demo( # Continue with other POs # Close RabbitMQ connection - await rabbitmq_client.close() + await rabbitmq_client.disconnect() logger.info( "PO approval alerts emission completed", @@ -188,7 +188,12 @@ async def _emit_po_approval_alerts_for_demo( virtual_tenant_id=str(virtual_tenant_id), exc_info=True ) - # Don't fail the cloning process + # Don't fail the cloning process - ensure we try to disconnect if connected + try: + if 'rabbitmq_client' in locals(): + await rabbitmq_client.disconnect() + except: + pass # Suppress cleanup errors return alerts_emitted diff --git a/shared/clients/forecast_client.py b/shared/clients/forecast_client.py index 46eff0dc..829a4046 100755 --- a/shared/clients/forecast_client.py +++ b/shared/clients/forecast_client.py @@ -341,6 +341,53 @@ class ForecastServiceClient(BaseServiceClient): } return await self.post("forecasting/ml/insights/generate-rules", data=data, tenant_id=tenant_id) + async def trigger_demand_insights_internal( + self, + tenant_id: str + ) -> Optional[Dict[str, Any]]: + """ + Trigger demand forecasting insights for a tenant (internal service use only). + + This method calls the internal endpoint which is protected by X-Internal-Service header. + Used by demo-session service after cloning to generate AI insights from seeded data. + + Args: + tenant_id: Tenant ID to trigger insights for + + Returns: + Dict with trigger results or None if failed + """ + try: + result = await self._make_request( + method="POST", + endpoint=f"forecasting/internal/ml/generate-demand-insights", + tenant_id=tenant_id, + data={"tenant_id": tenant_id}, + headers={"X-Internal-Service": "demo-session"} + ) + + if result: + self.logger.info( + "Demand insights triggered successfully via internal endpoint", + tenant_id=tenant_id, + insights_posted=result.get("insights_posted", 0) + ) + else: + self.logger.warning( + "Demand insights internal endpoint returned no result", + tenant_id=tenant_id + ) + + return result + + except Exception as e: + self.logger.error( + "Error triggering demand insights via internal endpoint", + tenant_id=tenant_id, + error=str(e) + ) + return None + # ================================================================ # Legacy/Compatibility Methods (deprecated) # ================================================================