diff --git a/frontend/nginx.conf b/frontend/nginx.conf index ab72c3fd..7402f45a 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -34,8 +34,8 @@ server { # Note: API routing is handled by ingress, not by this nginx # The frontend makes requests to /api which are routed by the ingress controller - # Static assets with aggressive caching - location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ { + # Static assets with aggressive caching (including source maps for debugging) + location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot|map)$ { expires 1y; add_header Cache-Control "public, immutable"; add_header Vary Accept-Encoding; diff --git a/frontend/src/api/client/apiClient.ts b/frontend/src/api/client/apiClient.ts index 54cf9ea1..0942c796 100644 --- a/frontend/src/api/client/apiClient.ts +++ b/frontend/src/api/client/apiClient.ts @@ -99,11 +99,16 @@ class ApiClient { config.url?.includes(endpoint) ); + // Check demo session ID from memory OR localStorage + const demoSessionId = this.demoSessionId || localStorage.getItem('demo_session_id'); + const isDemoMode = !!demoSessionId; + // Only add auth token for non-public endpoints if (this.authToken && !isPublicEndpoint) { config.headers.Authorization = `Bearer ${this.authToken}`; console.log('🔑 [API Client] Adding Authorization header for:', config.url); - } else if (!isPublicEndpoint) { + } else if (!isPublicEndpoint && !isDemoMode) { + // Only warn if NOT in demo mode - demo mode uses X-Demo-Session-Id header instead console.warn('⚠️ [API Client] No auth token available for:', config.url, 'authToken:', this.authToken ? 'exists' : 'missing'); } @@ -115,8 +120,7 @@ class ApiClient { console.warn('⚠️ [API Client] No tenant ID set for endpoint:', config.url); } - // Check demo session ID from memory OR localStorage - const demoSessionId = this.demoSessionId || localStorage.getItem('demo_session_id'); + // Add demo session ID header if in demo mode if (demoSessionId) { config.headers['X-Demo-Session-Id'] = demoSessionId; console.log('🔍 [API Client] Adding X-Demo-Session-Id header:', demoSessionId); diff --git a/frontend/src/api/hooks/useControlPanelData.ts b/frontend/src/api/hooks/useControlPanelData.ts index 61fa3d62..12d382c1 100644 --- a/frontend/src/api/hooks/useControlPanelData.ts +++ b/frontend/src/api/hooks/useControlPanelData.ts @@ -6,7 +6,7 @@ */ import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { useEffect, useState, useCallback } from 'react'; +import { useEffect, useState, useCallback, useRef } from 'react'; import { alertService } from '../services/alertService'; import { getPendingApprovalPurchaseOrders } from '../services/purchase_orders'; import { productionService } from '../services/production'; @@ -17,6 +17,9 @@ import { aiInsightsService } from '../services/aiInsights'; import { useSSEEvents } from '../../hooks/useSSE'; import { parseISO } from 'date-fns'; +// Debounce delay for SSE-triggered query invalidations (ms) +const SSE_INVALIDATION_DEBOUNCE_MS = 500; + // ============================================================ // Types // ============================================================ @@ -228,13 +231,15 @@ export function useControlPanelData(tenantId: string) { supplierMap.set(supplier.id, supplier.name || supplier.supplier_name); }); - // Merge SSE events with API data - const allAlerts = [...alerts]; + // Merge SSE events with API data (deduplicate by ID, prioritizing SSE events as they're newer) + let allAlerts: any[]; if (sseEvents.length > 0) { - // Merge SSE events, prioritizing newer events const sseEventIds = new Set(sseEvents.map(e => e.id)); - const mergedAlerts = alerts.filter(alert => !sseEventIds.has(alert.id)); - allAlerts.push(...sseEvents); + // Filter out API alerts that also exist in SSE (SSE has newer data) + const uniqueApiAlerts = alerts.filter((alert: any) => !sseEventIds.has(alert.id)); + allAlerts = [...uniqueApiAlerts, ...sseEvents]; + } else { + allAlerts = [...alerts]; } // Apply data priority rules for POs @@ -327,6 +332,32 @@ export function useControlPanelData(tenantId: string) { !a.hidden_from_ui && a.status === 'active' ); + + // Debug: Log alert counts by type_class + console.log('📊 [useControlPanelData] Alert analysis:', { + totalAlerts: allAlerts.length, + fromAPI: alerts.length, + fromSSE: sseEvents.length, + preventedIssuesCount: preventedIssues.length, + actionNeededCount: actionNeededAlerts.length, + typeClassBreakdown: allAlerts.reduce((acc: Record, a: any) => { + const typeClass = a.type_class || 'unknown'; + acc[typeClass] = (acc[typeClass] || 0) + 1; + return acc; + }, {}), + apiAlertsSample: alerts.slice(0, 3).map((a: any) => ({ + id: a.id, + event_type: a.event_type, + type_class: a.type_class, + status: a.status, + })), + sseEventsSample: sseEvents.slice(0, 3).map((a: any) => ({ + id: a.id, + event_type: a.event_type, + type_class: a.type_class, + status: a.status, + })), + }); // Calculate total issues requiring action: // 1. Action needed alerts @@ -387,24 +418,51 @@ export function useControlPanelData(tenantId: string) { retry: 2, }); - // SSE integration - invalidate query on relevant events + // Ref for debouncing SSE-triggered invalidations + const invalidationTimeoutRef = useRef(null); + const lastEventCountRef = useRef(0); + + // SSE integration - invalidate query on relevant events (debounced) useEffect(() => { - if (sseAlerts.length > 0 && tenantId) { - const relevantEvents = sseAlerts.filter(event => - event.event_type.includes('production.') || - event.event_type.includes('batch_') || - event.event_type.includes('delivery') || - event.event_type.includes('purchase_order') || - event.event_type.includes('equipment_') - ); - - if (relevantEvents.length > 0) { + // Skip if no new events since last check + if (sseAlerts.length === 0 || !tenantId || sseAlerts.length === lastEventCountRef.current) { + return; + } + + const relevantEvents = sseAlerts.filter(event => + event.event_type?.includes('production.') || + event.event_type?.includes('batch_') || + event.event_type?.includes('delivery') || + event.event_type?.includes('purchase_order') || + event.event_type?.includes('equipment_') || + event.event_type?.includes('insight') || + event.event_type?.includes('recommendation') || + event.event_type?.includes('ai_') || // Match ai_yield_prediction, ai_*, etc. + event.event_class === 'recommendation' + ); + + if (relevantEvents.length > 0) { + // Clear existing timeout to debounce rapid events + if (invalidationTimeoutRef.current) { + clearTimeout(invalidationTimeoutRef.current); + } + + // Debounce the invalidation to prevent multiple rapid refetches + invalidationTimeoutRef.current = setTimeout(() => { + lastEventCountRef.current = sseAlerts.length; queryClient.invalidateQueries({ queryKey: ['control-panel-data', tenantId], refetchType: 'active', }); - } + }, SSE_INVALIDATION_DEBOUNCE_MS); } + + // Cleanup timeout on unmount or dependency change + return () => { + if (invalidationTimeoutRef.current) { + clearTimeout(invalidationTimeoutRef.current); + } + }; }, [sseAlerts, tenantId, queryClient]); return query; diff --git a/frontend/src/contexts/SSEContext.tsx b/frontend/src/contexts/SSEContext.tsx index 07f9e483..d5a591da 100644 --- a/frontend/src/contexts/SSEContext.tsx +++ b/frontend/src/contexts/SSEContext.tsx @@ -1,4 +1,4 @@ -import React, { createContext, useContext, useEffect, useRef, useState, ReactNode } from 'react'; +import React, { createContext, useContext, useEffect, useRef, useState, ReactNode, useCallback } from 'react'; import { useAuthStore } from '../stores/auth.store'; import { useCurrentTenant } from '../stores/tenant.store'; import { showToast } from '../utils/toast'; @@ -103,6 +103,11 @@ export const SSEProvider: React.FC = ({ children }) => { setIsConnected(true); reconnectAttempts.current = 0; + // Clear processed event IDs on new connection to allow fresh state from server + // This ensures events are processed again after reconnection or navigation + processedEventIdsRef.current.clear(); + console.log('🔄 [SSE] Cleared processed event IDs cache on connection open'); + if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = undefined; @@ -214,6 +219,11 @@ export const SSEProvider: React.FC = ({ children }) => { // Trigger listeners with enriched alert data // Wrap in queueMicrotask to prevent setState during render warnings const listeners = eventListenersRef.current.get('alert'); + console.log('📤 [SSEContext] Notifying alert listeners:', { + listenerCount: listeners?.size || 0, + eventId: data.id, + eventType: data.event_type || data.type, + }); if (listeners) { listeners.forEach(callback => { queueMicrotask(() => callback(data)); @@ -347,6 +357,22 @@ export const SSEProvider: React.FC = ({ children }) => { eventSource.addEventListener('recommendation', (event) => { try { const data = JSON.parse(event.data); + + // GLOBAL DEDUPLICATION: Skip if this event was already processed + if (data.id && processedEventIdsRef.current.has(data.id)) { + console.log('⏭️ [SSE] Skipping duplicate recommendation:', data.id); + return; + } + + // Mark event as processed + if (data.id) { + processedEventIdsRef.current.add(data.id); + if (processedEventIdsRef.current.size > 1000) { + const firstId = Array.from(processedEventIdsRef.current)[0]; + processedEventIdsRef.current.delete(firstId); + } + } + const sseEvent: SSEEvent = { type: 'recommendation', data, @@ -433,6 +459,11 @@ export const SSEProvider: React.FC = ({ children }) => { // Trigger listeners with recommendation data // Wrap in queueMicrotask to prevent setState during render warnings const listeners = eventListenersRef.current.get('recommendation'); + console.log('📤 [SSEContext] Notifying recommendation listeners:', { + listenerCount: listeners?.size || 0, + eventId: data.id, + eventType: data.event_type || data.type, + }); if (listeners) { listeners.forEach(callback => { queueMicrotask(() => callback(data)); @@ -483,13 +514,14 @@ export const SSEProvider: React.FC = ({ children }) => { reconnectAttempts.current = 0; }; - const addEventListener = (eventType: string, callback: (data: any) => void) => { + // Memoize addEventListener to prevent unnecessary effect re-runs in consumers + const addEventListener = useCallback((eventType: string, callback: (data: any) => void) => { if (!eventListenersRef.current.has(eventType)) { eventListenersRef.current.set(eventType, new Set()); } - + eventListenersRef.current.get(eventType)!.add(callback); - + // Return cleanup function return () => { const listeners = eventListenersRef.current.get(eventType); @@ -500,7 +532,7 @@ export const SSEProvider: React.FC = ({ children }) => { } } }; - }; + }, []); // No dependencies - uses only refs // Connect when authenticated, disconnect when not or when tenant changes useEffect(() => { @@ -531,6 +563,8 @@ export const SSEProvider: React.FC = ({ children }) => { }; }, []); + // Context value - consumers should extract only what they need + // addEventListener is now stable (wrapped in useCallback) const contextValue: SSEContextType = { isConnected, lastEvent, diff --git a/frontend/src/hooks/useSSE.ts b/frontend/src/hooks/useSSE.ts index f011af2a..4ab8272e 100644 --- a/frontend/src/hooks/useSSE.ts +++ b/frontend/src/hooks/useSSE.ts @@ -4,13 +4,18 @@ * Wrapper around SSEContext that collects and manages events. * Provides a clean interface for subscribing to SSE events with channel filtering. * + * Features: + * - Event batching to prevent race conditions when multiple events arrive rapidly + * - Deduplication by event ID + * - Configurable channel filtering + * * Examples: * const { events } = useSSE(); // All events * const { events } = useSSE({ channels: ['inventory.alerts'] }); * const { events } = useSSE({ channels: ['*.notifications'] }); */ -import { useContext, useEffect, useState, useCallback } from 'react'; +import { useContext, useEffect, useState, useCallback, useRef, useMemo } from 'react'; import { SSEContext } from '../contexts/SSEContext'; import type { Event, Alert, Notification, Recommendation } from '../api/types/events'; import { convertLegacyAlert } from '../api/types/events'; @@ -26,162 +31,172 @@ interface UseSSEReturn { } const MAX_EVENTS = 200; // Keep last 200 events in memory +const BATCH_DELAY_MS = 50; // Batch events for 50ms to prevent race conditions + +/** + * Normalize event data to consistent Event format + */ +function normalizeEvent(eventType: string, data: any): Event { + // Check if it's new format (has event_class) or legacy format + if (data.event_class === 'alert' || data.event_class === 'notification' || data.event_class === 'recommendation') { + // Ensure event_type is set even for new format events + return { + ...data, + event_type: data.event_type || data.type || eventType, + } as Event; + } else if (data.item_type === 'alert' || data.item_type === 'recommendation') { + return convertLegacyAlert(data); + } else if (eventType === 'recommendation') { + return { + ...data, + event_class: 'recommendation', + event_domain: data.event_domain || 'ai_insights', + event_type: data.event_type || data.type || 'recommendation', + } as Recommendation; + } else if (eventType === 'notification') { + return { + ...data, + event_type: data.event_type || data.type || 'notification', + } as Notification; + } else { + return { + ...data, + event_class: 'alert', + event_domain: data.event_domain || 'operations', + event_type: data.event_type || data.type || 'alert', + } as Alert; + } +} + +/** + * Deduplicate events by ID, keeping the newest version + */ +function deduplicateEvents(events: Event[]): Event[] { + const seen = new Map(); + for (const event of events) { + if (!seen.has(event.id)) { + seen.set(event.id, event); + } + } + return Array.from(seen.values()); +} export function useSSEEvents(config: UseSSEConfig = {}): UseSSEReturn { - const context = useContext(SSEContext); + const context = useContext(SSEContext); const [events, setEvents] = useState([]); + // Refs for batching - these persist across renders without causing re-renders + const eventBufferRef = useRef([]); + const flushTimeoutRef = useRef(null); + const processedIdsRef = useRef>(new Set()); + if (!context) { throw new Error('useSSE must be used within SSEProvider'); } - // Create a stable key for the config channels to avoid unnecessary re-renders - // Use JSON.stringify for reliable comparison of channel arrays - const channelsKey = JSON.stringify(config.channels || []); + // Memoize channels key to prevent unnecessary effect re-runs + const channelsKey = useMemo(() => + JSON.stringify(config.channels?.slice().sort() || []), + [config.channels] + ); + + // Flush buffered events to state - batches multiple events into single state update + // IMPORTANT: No dependencies - this function's identity must be stable + const flushEvents = useCallback(() => { + if (eventBufferRef.current.length === 0) return; + + const bufferedEvents = eventBufferRef.current; + eventBufferRef.current = []; + + setEvents(prev => { + // Combine buffered events with existing events + const combined = [...bufferedEvents, ...prev]; + // Deduplicate and limit + const deduplicated = deduplicateEvents(combined); + return deduplicated.slice(0, MAX_EVENTS); + }); + }, []); // No dependencies - stable identity + + // Add event to buffer and schedule flush + // IMPORTANT: Only depends on flushEvents which is now stable + const bufferEvent = useCallback((event: Event) => { + // Skip if already processed recently (extra safety) + if (processedIdsRef.current.has(event.id)) { + return; + } + + // Mark as processed + processedIdsRef.current.add(event.id); + + // Limit processed IDs cache size + if (processedIdsRef.current.size > 1000) { + const firstId = Array.from(processedIdsRef.current)[0]; + processedIdsRef.current.delete(firstId); + } + + // Add to buffer + eventBufferRef.current.push(event); + + // Clear existing timeout and set new one + if (flushTimeoutRef.current) { + clearTimeout(flushTimeoutRef.current); + } + + // Flush after delay to batch rapid events + flushTimeoutRef.current = setTimeout(flushEvents, BATCH_DELAY_MS); + }, [flushEvents]); // flushEvents is now stable (no deps) + + // Extract addEventListener to avoid re-running effect when other context values change + const { addEventListener } = context; useEffect(() => { const unsubscribers: (() => void)[] = []; - // Listen to 'alert' events (can be Alert or legacy format) - const handleAlert = (data: any) => { - console.log('🟢 [useSSE] handleAlert triggered', { data }); - let event: Event; - - // Check if it's new format (has event_class) or legacy format - if (data.event_class === 'alert' || data.event_class === 'notification' || data.event_class === 'recommendation') { - // New format - event = data as Event; - } else if (data.item_type === 'alert' || data.item_type === 'recommendation') { - // Legacy format - convert - event = convertLegacyAlert(data); - } else { - // Assume it's an alert if no clear classification - event = { ...data, event_class: 'alert', event_domain: 'operations' } as Alert; - } - - console.log('🟢 [useSSE] Setting events state with new alert', { - eventId: event.id, - eventClass: event.event_class, - eventDomain: event.event_domain, - }); - - setEvents(prev => { - // Check if this event already exists to prevent duplicate processing - const existingIndex = prev.findIndex(e => e.id === event.id); - if (existingIndex !== -1) { - // Update existing event instead of adding duplicate - const newEvents = [...prev]; - newEvents[existingIndex] = event; - return newEvents.slice(0, MAX_EVENTS); - } - - // Add new event if not duplicate - const filtered = prev.filter(e => e.id !== event.id); - const newEvents = [event, ...filtered].slice(0, MAX_EVENTS); - console.log('🟢 [useSSE] Events array updated', { - prevCount: prev.length, - newCount: newEvents.length, - newEventIds: newEvents.map(e => e.id).join(','), - }); - return newEvents; - }); + // Unified event handler for all event types + const createHandler = (eventType: string) => (data: any) => { + const event = normalizeEvent(eventType, data); + bufferEvent(event); }; - unsubscribers.push(context.addEventListener('alert', handleAlert)); - - // Listen to 'notification' events - const handleNotification = (data: Notification) => { - setEvents(prev => { - // Check if this notification already exists to prevent duplicate processing - const existingIndex = prev.findIndex(e => e.id === data.id); - if (existingIndex !== -1) { - // Update existing notification instead of adding duplicate - const newEvents = [...prev]; - newEvents[existingIndex] = data; - return newEvents.slice(0, MAX_EVENTS); - } - - // Add new notification if not duplicate - const filtered = prev.filter(e => e.id !== data.id); - return [data, ...filtered].slice(0, MAX_EVENTS); - }); - }; - - unsubscribers.push(context.addEventListener('notification', handleNotification)); - - // Listen to 'recommendation' events - const handleRecommendation = (data: any) => { - let event: Recommendation; - - // Handle both new and legacy formats - if (data.event_class === 'recommendation') { - event = data as Recommendation; - } else if (data.item_type === 'recommendation') { - event = convertLegacyAlert(data) as Recommendation; - } else { - event = { ...data, event_class: 'recommendation', event_domain: 'operations' } as Recommendation; - } - - setEvents(prev => { - // Check if this recommendation already exists to prevent duplicate processing - const existingIndex = prev.findIndex(e => e.id === event.id); - if (existingIndex !== -1) { - // Update existing recommendation instead of adding duplicate - const newEvents = [...prev]; - newEvents[existingIndex] = event; - return newEvents.slice(0, MAX_EVENTS); - } - - // Add new recommendation if not duplicate - const filtered = prev.filter(e => e.id !== event.id); - return [event, ...filtered].slice(0, MAX_EVENTS); - }); - }; - - unsubscribers.push(context.addEventListener('recommendation', handleRecommendation)); + // Subscribe to all event types + unsubscribers.push(addEventListener('alert', createHandler('alert'))); + unsubscribers.push(addEventListener('notification', createHandler('notification'))); + unsubscribers.push(addEventListener('recommendation', createHandler('recommendation'))); // Listen to 'initial_state' event (batch load on connection) const handleInitialState = (data: any) => { if (Array.isArray(data)) { - // Convert each event to proper format - const initialEvents = data.map(item => { - if (item.event_class) { - return item as Event; - } else if (item.item_type) { - return convertLegacyAlert(item); - } else { - return { ...item, event_class: 'alert', event_domain: 'operations' } as Event; - } - }); - setEvents(initialEvents.slice(0, MAX_EVENTS)); + const initialEvents = data.map(item => normalizeEvent('alert', item)); + // For initial state, set directly without buffering + setEvents(deduplicateEvents(initialEvents).slice(0, MAX_EVENTS)); + // Mark all as processed + initialEvents.forEach(e => processedIdsRef.current.add(e.id)); } }; - unsubscribers.push(context.addEventListener('initial_state', handleInitialState)); - - // Also listen to legacy 'initial_items' event - const handleInitialItems = (data: any) => { - if (Array.isArray(data)) { - const initialEvents = data.map(item => { - if (item.event_class) { - return item as Event; - } else { - return convertLegacyAlert(item); - } - }); - setEvents(initialEvents.slice(0, MAX_EVENTS)); - } - }; - - unsubscribers.push(context.addEventListener('initial_items', handleInitialItems)); + unsubscribers.push(addEventListener('initial_state', handleInitialState)); + unsubscribers.push(addEventListener('initial_items', handleInitialState)); return () => { + // Cleanup subscriptions only - do NOT clear flush timeout here + // The flush timeout should complete even during cleanup unsubscribers.forEach(unsub => unsub()); }; - }, [context, channelsKey]); // Fixed: Added channelsKey dependency + }, [addEventListener, channelsKey, bufferEvent]); + + // Separate cleanup effect for flush timeout on unmount only + useEffect(() => { + return () => { + if (flushTimeoutRef.current) { + clearTimeout(flushTimeoutRef.current); + } + }; + }, []); // Empty deps = only runs on unmount const clearEvents = useCallback(() => { setEvents([]); + eventBufferRef.current = []; + processedIdsRef.current.clear(); }, []); return { diff --git a/frontend/src/pages/app/EnterpriseDashboardPage.tsx b/frontend/src/pages/app/EnterpriseDashboardPage.tsx index 3d122de1..d07c8685 100644 --- a/frontend/src/pages/app/EnterpriseDashboardPage.tsx +++ b/frontend/src/pages/app/EnterpriseDashboardPage.tsx @@ -3,7 +3,7 @@ * Main dashboard for enterprise parent tenants showing network-wide metrics */ -import React, { useState, useEffect } from 'react'; +import React, { useState, useEffect, useRef } from 'react'; import { useNavigate, useParams } from 'react-router-dom'; import { useNetworkSummary, @@ -72,15 +72,21 @@ const EnterpriseDashboardPage: React.FC = ({ tenan const queryClient = useQueryClient(); // SSE Integration for real-time updates - const { events: sseEvents } = useSSEEvents({ - channels: ['*.alerts', '*.notifications', 'recommendations'] + const { events: sseEvents } = useSSEEvents({ + channels: ['*.alerts', '*.notifications', 'recommendations'] }); - // Invalidate enterprise data on relevant SSE events - useEffect(() => { - if (sseEvents.length === 0 || !tenantId) return; + // Refs for debouncing SSE-triggered invalidations + const invalidationTimeoutRef = useRef(null); + const lastEventCountRef = useRef(0); + + // Invalidate enterprise data on relevant SSE events (debounced) + useEffect(() => { + // Skip if no new events since last check + if (sseEvents.length === 0 || !tenantId || sseEvents.length === lastEventCountRef.current) { + return; + } - const latest = sseEvents[0]; const relevantEventTypes = [ 'batch_completed', 'batch_started', 'batch_state_changed', 'delivery_received', 'delivery_overdue', 'delivery_arriving_soon', @@ -88,30 +94,49 @@ const EnterpriseDashboardPage: React.FC = ({ tenan 'production_delay', 'batch_start_delayed', 'equipment_maintenance', 'network_alert', 'outlet_performance_update', 'distribution_route_update' ]; - - if (relevantEventTypes.includes(latest.event_type)) { - // Invalidate all enterprise dashboard queries - queryClient.invalidateQueries({ - queryKey: ['enterprise', 'network-summary', tenantId], - refetchType: 'active', - }); - queryClient.invalidateQueries({ - queryKey: ['enterprise', 'children-performance', tenantId], - refetchType: 'active', - }); - queryClient.invalidateQueries({ - queryKey: ['enterprise', 'distribution-overview', tenantId], - refetchType: 'active', - }); - queryClient.invalidateQueries({ - queryKey: ['enterprise', 'forecast-summary', tenantId], - refetchType: 'active', - }); - queryClient.invalidateQueries({ - queryKey: ['control-panel-data', tenantId], - refetchType: 'active', - }); + + // Check if any event is relevant + const hasRelevantEvent = sseEvents.some(event => + relevantEventTypes.includes(event.event_type) + ); + + if (hasRelevantEvent) { + // Clear existing timeout to debounce rapid events + if (invalidationTimeoutRef.current) { + clearTimeout(invalidationTimeoutRef.current); + } + + // Debounce the invalidation to prevent multiple rapid refetches + invalidationTimeoutRef.current = setTimeout(() => { + lastEventCountRef.current = sseEvents.length; + + // Invalidate all enterprise dashboard queries in a single batch + queryClient.invalidateQueries({ + queryKey: ['enterprise', 'network-summary', tenantId], + refetchType: 'active', + }); + queryClient.invalidateQueries({ + queryKey: ['enterprise', 'children-performance', tenantId], + refetchType: 'active', + }); + queryClient.invalidateQueries({ + queryKey: ['enterprise', 'distribution-overview', tenantId], + refetchType: 'active', + }); + queryClient.invalidateQueries({ + queryKey: ['enterprise', 'forecast-summary', tenantId], + refetchType: 'active', + }); + // Note: control-panel-data has its own debounced invalidation in useControlPanelData + }, 500); // 500ms debounce } + + // Cleanup timeout on unmount or dependency change + return () => { + if (invalidationTimeoutRef.current) { + clearTimeout(invalidationTimeoutRef.current); + } + }; }, [sseEvents, tenantId, queryClient]); // Check if tenantId is available at the start diff --git a/services/alert_processor/app/services/enrichment_orchestrator.py b/services/alert_processor/app/services/enrichment_orchestrator.py index e622b71f..a6834939 100644 --- a/services/alert_processor/app/services/enrichment_orchestrator.py +++ b/services/alert_processor/app/services/enrichment_orchestrator.py @@ -69,6 +69,19 @@ class EnrichmentOrchestrator: metadata=event.metadata ) + # Fallback: If orchestrator service didn't return context with already_addressed, + # check if the event metadata contains orchestrator_context (e.g., from demo seeder) + if not orchestrator_context_dict.get("already_addressed"): + metadata_context = event.metadata.get("orchestrator_context") + if metadata_context and isinstance(metadata_context, dict): + # Merge metadata context into orchestrator context + orchestrator_context_dict.update(metadata_context) + logger.debug( + "using_metadata_orchestrator_context", + event_type=event.event_type, + already_addressed=metadata_context.get("already_addressed") + ) + # Convert to OrchestratorContext if data exists orchestrator_context = None if orchestrator_context_dict: @@ -115,7 +128,7 @@ class EnrichmentOrchestrator: ) # 9. Determine type class - type_class = self._determine_type_class(orchestrator_context_dict) + type_class = self._determine_type_class(orchestrator_context_dict, event.metadata) # 10. Extract AI reasoning from metadata (if present) reasoning_data = event.metadata.get('reasoning_data') @@ -184,13 +197,25 @@ class EnrichmentOrchestrator: else: return "info" - def _determine_type_class(self, orchestrator_context: dict) -> str: + def _determine_type_class(self, orchestrator_context: dict, metadata: dict = None) -> str: """ - Determine type class based on orchestrator context. + Determine type class based on orchestrator context or metadata override. + + Priority order: + 1. Explicit type_class in metadata (e.g., from demo seeder) + 2. orchestrator_context.already_addressed = True -> "prevented_issue" + 3. Default: "action_needed" - prevented_issue: AI already handled it - action_needed: User action required """ + # Check for explicit type_class in metadata (allows demo seeder override) + if metadata: + explicit_type_class = metadata.get("type_class") + if explicit_type_class in ("prevented_issue", "action_needed"): + return explicit_type_class + + # Determine from orchestrator context if orchestrator_context and orchestrator_context.get("already_addressed"): return "prevented_issue" return "action_needed"