Add frontend alerts imporvements
This commit is contained in:
@@ -34,8 +34,8 @@ server {
|
|||||||
# Note: API routing is handled by ingress, not by this nginx
|
# Note: API routing is handled by ingress, not by this nginx
|
||||||
# The frontend makes requests to /api which are routed by the ingress controller
|
# The frontend makes requests to /api which are routed by the ingress controller
|
||||||
|
|
||||||
# Static assets with aggressive caching
|
# Static assets with aggressive caching (including source maps for debugging)
|
||||||
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
|
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot|map)$ {
|
||||||
expires 1y;
|
expires 1y;
|
||||||
add_header Cache-Control "public, immutable";
|
add_header Cache-Control "public, immutable";
|
||||||
add_header Vary Accept-Encoding;
|
add_header Vary Accept-Encoding;
|
||||||
|
|||||||
@@ -99,11 +99,16 @@ class ApiClient {
|
|||||||
config.url?.includes(endpoint)
|
config.url?.includes(endpoint)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Check demo session ID from memory OR localStorage
|
||||||
|
const demoSessionId = this.demoSessionId || localStorage.getItem('demo_session_id');
|
||||||
|
const isDemoMode = !!demoSessionId;
|
||||||
|
|
||||||
// Only add auth token for non-public endpoints
|
// Only add auth token for non-public endpoints
|
||||||
if (this.authToken && !isPublicEndpoint) {
|
if (this.authToken && !isPublicEndpoint) {
|
||||||
config.headers.Authorization = `Bearer ${this.authToken}`;
|
config.headers.Authorization = `Bearer ${this.authToken}`;
|
||||||
console.log('🔑 [API Client] Adding Authorization header for:', config.url);
|
console.log('🔑 [API Client] Adding Authorization header for:', config.url);
|
||||||
} else if (!isPublicEndpoint) {
|
} else if (!isPublicEndpoint && !isDemoMode) {
|
||||||
|
// Only warn if NOT in demo mode - demo mode uses X-Demo-Session-Id header instead
|
||||||
console.warn('⚠️ [API Client] No auth token available for:', config.url, 'authToken:', this.authToken ? 'exists' : 'missing');
|
console.warn('⚠️ [API Client] No auth token available for:', config.url, 'authToken:', this.authToken ? 'exists' : 'missing');
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,8 +120,7 @@ class ApiClient {
|
|||||||
console.warn('⚠️ [API Client] No tenant ID set for endpoint:', config.url);
|
console.warn('⚠️ [API Client] No tenant ID set for endpoint:', config.url);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check demo session ID from memory OR localStorage
|
// Add demo session ID header if in demo mode
|
||||||
const demoSessionId = this.demoSessionId || localStorage.getItem('demo_session_id');
|
|
||||||
if (demoSessionId) {
|
if (demoSessionId) {
|
||||||
config.headers['X-Demo-Session-Id'] = demoSessionId;
|
config.headers['X-Demo-Session-Id'] = demoSessionId;
|
||||||
console.log('🔍 [API Client] Adding X-Demo-Session-Id header:', demoSessionId);
|
console.log('🔍 [API Client] Adding X-Demo-Session-Id header:', demoSessionId);
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { useQuery, useQueryClient } from '@tanstack/react-query';
|
import { useQuery, useQueryClient } from '@tanstack/react-query';
|
||||||
import { useEffect, useState, useCallback } from 'react';
|
import { useEffect, useState, useCallback, useRef } from 'react';
|
||||||
import { alertService } from '../services/alertService';
|
import { alertService } from '../services/alertService';
|
||||||
import { getPendingApprovalPurchaseOrders } from '../services/purchase_orders';
|
import { getPendingApprovalPurchaseOrders } from '../services/purchase_orders';
|
||||||
import { productionService } from '../services/production';
|
import { productionService } from '../services/production';
|
||||||
@@ -17,6 +17,9 @@ import { aiInsightsService } from '../services/aiInsights';
|
|||||||
import { useSSEEvents } from '../../hooks/useSSE';
|
import { useSSEEvents } from '../../hooks/useSSE';
|
||||||
import { parseISO } from 'date-fns';
|
import { parseISO } from 'date-fns';
|
||||||
|
|
||||||
|
// Debounce delay for SSE-triggered query invalidations (ms)
|
||||||
|
const SSE_INVALIDATION_DEBOUNCE_MS = 500;
|
||||||
|
|
||||||
// ============================================================
|
// ============================================================
|
||||||
// Types
|
// Types
|
||||||
// ============================================================
|
// ============================================================
|
||||||
@@ -228,13 +231,15 @@ export function useControlPanelData(tenantId: string) {
|
|||||||
supplierMap.set(supplier.id, supplier.name || supplier.supplier_name);
|
supplierMap.set(supplier.id, supplier.name || supplier.supplier_name);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Merge SSE events with API data
|
// Merge SSE events with API data (deduplicate by ID, prioritizing SSE events as they're newer)
|
||||||
const allAlerts = [...alerts];
|
let allAlerts: any[];
|
||||||
if (sseEvents.length > 0) {
|
if (sseEvents.length > 0) {
|
||||||
// Merge SSE events, prioritizing newer events
|
|
||||||
const sseEventIds = new Set(sseEvents.map(e => e.id));
|
const sseEventIds = new Set(sseEvents.map(e => e.id));
|
||||||
const mergedAlerts = alerts.filter(alert => !sseEventIds.has(alert.id));
|
// Filter out API alerts that also exist in SSE (SSE has newer data)
|
||||||
allAlerts.push(...sseEvents);
|
const uniqueApiAlerts = alerts.filter((alert: any) => !sseEventIds.has(alert.id));
|
||||||
|
allAlerts = [...uniqueApiAlerts, ...sseEvents];
|
||||||
|
} else {
|
||||||
|
allAlerts = [...alerts];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply data priority rules for POs
|
// Apply data priority rules for POs
|
||||||
@@ -328,6 +333,32 @@ export function useControlPanelData(tenantId: string) {
|
|||||||
a.status === 'active'
|
a.status === 'active'
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Debug: Log alert counts by type_class
|
||||||
|
console.log('📊 [useControlPanelData] Alert analysis:', {
|
||||||
|
totalAlerts: allAlerts.length,
|
||||||
|
fromAPI: alerts.length,
|
||||||
|
fromSSE: sseEvents.length,
|
||||||
|
preventedIssuesCount: preventedIssues.length,
|
||||||
|
actionNeededCount: actionNeededAlerts.length,
|
||||||
|
typeClassBreakdown: allAlerts.reduce((acc: Record<string, number>, a: any) => {
|
||||||
|
const typeClass = a.type_class || 'unknown';
|
||||||
|
acc[typeClass] = (acc[typeClass] || 0) + 1;
|
||||||
|
return acc;
|
||||||
|
}, {}),
|
||||||
|
apiAlertsSample: alerts.slice(0, 3).map((a: any) => ({
|
||||||
|
id: a.id,
|
||||||
|
event_type: a.event_type,
|
||||||
|
type_class: a.type_class,
|
||||||
|
status: a.status,
|
||||||
|
})),
|
||||||
|
sseEventsSample: sseEvents.slice(0, 3).map((a: any) => ({
|
||||||
|
id: a.id,
|
||||||
|
event_type: a.event_type,
|
||||||
|
type_class: a.type_class,
|
||||||
|
status: a.status,
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
|
||||||
// Calculate total issues requiring action:
|
// Calculate total issues requiring action:
|
||||||
// 1. Action needed alerts
|
// 1. Action needed alerts
|
||||||
// 2. Pending PO approvals (each PO requires approval action)
|
// 2. Pending PO approvals (each PO requires approval action)
|
||||||
@@ -387,24 +418,51 @@ export function useControlPanelData(tenantId: string) {
|
|||||||
retry: 2,
|
retry: 2,
|
||||||
});
|
});
|
||||||
|
|
||||||
// SSE integration - invalidate query on relevant events
|
// Ref for debouncing SSE-triggered invalidations
|
||||||
useEffect(() => {
|
const invalidationTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||||
if (sseAlerts.length > 0 && tenantId) {
|
const lastEventCountRef = useRef<number>(0);
|
||||||
const relevantEvents = sseAlerts.filter(event =>
|
|
||||||
event.event_type.includes('production.') ||
|
|
||||||
event.event_type.includes('batch_') ||
|
|
||||||
event.event_type.includes('delivery') ||
|
|
||||||
event.event_type.includes('purchase_order') ||
|
|
||||||
event.event_type.includes('equipment_')
|
|
||||||
);
|
|
||||||
|
|
||||||
if (relevantEvents.length > 0) {
|
// SSE integration - invalidate query on relevant events (debounced)
|
||||||
|
useEffect(() => {
|
||||||
|
// Skip if no new events since last check
|
||||||
|
if (sseAlerts.length === 0 || !tenantId || sseAlerts.length === lastEventCountRef.current) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const relevantEvents = sseAlerts.filter(event =>
|
||||||
|
event.event_type?.includes('production.') ||
|
||||||
|
event.event_type?.includes('batch_') ||
|
||||||
|
event.event_type?.includes('delivery') ||
|
||||||
|
event.event_type?.includes('purchase_order') ||
|
||||||
|
event.event_type?.includes('equipment_') ||
|
||||||
|
event.event_type?.includes('insight') ||
|
||||||
|
event.event_type?.includes('recommendation') ||
|
||||||
|
event.event_type?.includes('ai_') || // Match ai_yield_prediction, ai_*, etc.
|
||||||
|
event.event_class === 'recommendation'
|
||||||
|
);
|
||||||
|
|
||||||
|
if (relevantEvents.length > 0) {
|
||||||
|
// Clear existing timeout to debounce rapid events
|
||||||
|
if (invalidationTimeoutRef.current) {
|
||||||
|
clearTimeout(invalidationTimeoutRef.current);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debounce the invalidation to prevent multiple rapid refetches
|
||||||
|
invalidationTimeoutRef.current = setTimeout(() => {
|
||||||
|
lastEventCountRef.current = sseAlerts.length;
|
||||||
queryClient.invalidateQueries({
|
queryClient.invalidateQueries({
|
||||||
queryKey: ['control-panel-data', tenantId],
|
queryKey: ['control-panel-data', tenantId],
|
||||||
refetchType: 'active',
|
refetchType: 'active',
|
||||||
});
|
});
|
||||||
}
|
}, SSE_INVALIDATION_DEBOUNCE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup timeout on unmount or dependency change
|
||||||
|
return () => {
|
||||||
|
if (invalidationTimeoutRef.current) {
|
||||||
|
clearTimeout(invalidationTimeoutRef.current);
|
||||||
|
}
|
||||||
|
};
|
||||||
}, [sseAlerts, tenantId, queryClient]);
|
}, [sseAlerts, tenantId, queryClient]);
|
||||||
|
|
||||||
return query;
|
return query;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import React, { createContext, useContext, useEffect, useRef, useState, ReactNode } from 'react';
|
import React, { createContext, useContext, useEffect, useRef, useState, ReactNode, useCallback } from 'react';
|
||||||
import { useAuthStore } from '../stores/auth.store';
|
import { useAuthStore } from '../stores/auth.store';
|
||||||
import { useCurrentTenant } from '../stores/tenant.store';
|
import { useCurrentTenant } from '../stores/tenant.store';
|
||||||
import { showToast } from '../utils/toast';
|
import { showToast } from '../utils/toast';
|
||||||
@@ -103,6 +103,11 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
setIsConnected(true);
|
setIsConnected(true);
|
||||||
reconnectAttempts.current = 0;
|
reconnectAttempts.current = 0;
|
||||||
|
|
||||||
|
// Clear processed event IDs on new connection to allow fresh state from server
|
||||||
|
// This ensures events are processed again after reconnection or navigation
|
||||||
|
processedEventIdsRef.current.clear();
|
||||||
|
console.log('🔄 [SSE] Cleared processed event IDs cache on connection open');
|
||||||
|
|
||||||
if (reconnectTimeoutRef.current) {
|
if (reconnectTimeoutRef.current) {
|
||||||
clearTimeout(reconnectTimeoutRef.current);
|
clearTimeout(reconnectTimeoutRef.current);
|
||||||
reconnectTimeoutRef.current = undefined;
|
reconnectTimeoutRef.current = undefined;
|
||||||
@@ -214,6 +219,11 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
// Trigger listeners with enriched alert data
|
// Trigger listeners with enriched alert data
|
||||||
// Wrap in queueMicrotask to prevent setState during render warnings
|
// Wrap in queueMicrotask to prevent setState during render warnings
|
||||||
const listeners = eventListenersRef.current.get('alert');
|
const listeners = eventListenersRef.current.get('alert');
|
||||||
|
console.log('📤 [SSEContext] Notifying alert listeners:', {
|
||||||
|
listenerCount: listeners?.size || 0,
|
||||||
|
eventId: data.id,
|
||||||
|
eventType: data.event_type || data.type,
|
||||||
|
});
|
||||||
if (listeners) {
|
if (listeners) {
|
||||||
listeners.forEach(callback => {
|
listeners.forEach(callback => {
|
||||||
queueMicrotask(() => callback(data));
|
queueMicrotask(() => callback(data));
|
||||||
@@ -347,6 +357,22 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
eventSource.addEventListener('recommendation', (event) => {
|
eventSource.addEventListener('recommendation', (event) => {
|
||||||
try {
|
try {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
|
|
||||||
|
// GLOBAL DEDUPLICATION: Skip if this event was already processed
|
||||||
|
if (data.id && processedEventIdsRef.current.has(data.id)) {
|
||||||
|
console.log('⏭️ [SSE] Skipping duplicate recommendation:', data.id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark event as processed
|
||||||
|
if (data.id) {
|
||||||
|
processedEventIdsRef.current.add(data.id);
|
||||||
|
if (processedEventIdsRef.current.size > 1000) {
|
||||||
|
const firstId = Array.from(processedEventIdsRef.current)[0];
|
||||||
|
processedEventIdsRef.current.delete(firstId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const sseEvent: SSEEvent = {
|
const sseEvent: SSEEvent = {
|
||||||
type: 'recommendation',
|
type: 'recommendation',
|
||||||
data,
|
data,
|
||||||
@@ -433,6 +459,11 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
// Trigger listeners with recommendation data
|
// Trigger listeners with recommendation data
|
||||||
// Wrap in queueMicrotask to prevent setState during render warnings
|
// Wrap in queueMicrotask to prevent setState during render warnings
|
||||||
const listeners = eventListenersRef.current.get('recommendation');
|
const listeners = eventListenersRef.current.get('recommendation');
|
||||||
|
console.log('📤 [SSEContext] Notifying recommendation listeners:', {
|
||||||
|
listenerCount: listeners?.size || 0,
|
||||||
|
eventId: data.id,
|
||||||
|
eventType: data.event_type || data.type,
|
||||||
|
});
|
||||||
if (listeners) {
|
if (listeners) {
|
||||||
listeners.forEach(callback => {
|
listeners.forEach(callback => {
|
||||||
queueMicrotask(() => callback(data));
|
queueMicrotask(() => callback(data));
|
||||||
@@ -483,7 +514,8 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
reconnectAttempts.current = 0;
|
reconnectAttempts.current = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
const addEventListener = (eventType: string, callback: (data: any) => void) => {
|
// Memoize addEventListener to prevent unnecessary effect re-runs in consumers
|
||||||
|
const addEventListener = useCallback((eventType: string, callback: (data: any) => void) => {
|
||||||
if (!eventListenersRef.current.has(eventType)) {
|
if (!eventListenersRef.current.has(eventType)) {
|
||||||
eventListenersRef.current.set(eventType, new Set());
|
eventListenersRef.current.set(eventType, new Set());
|
||||||
}
|
}
|
||||||
@@ -500,7 +532,7 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
};
|
}, []); // No dependencies - uses only refs
|
||||||
|
|
||||||
// Connect when authenticated, disconnect when not or when tenant changes
|
// Connect when authenticated, disconnect when not or when tenant changes
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
@@ -531,6 +563,8 @@ export const SSEProvider: React.FC<SSEProviderProps> = ({ children }) => {
|
|||||||
};
|
};
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
|
// Context value - consumers should extract only what they need
|
||||||
|
// addEventListener is now stable (wrapped in useCallback)
|
||||||
const contextValue: SSEContextType = {
|
const contextValue: SSEContextType = {
|
||||||
isConnected,
|
isConnected,
|
||||||
lastEvent,
|
lastEvent,
|
||||||
|
|||||||
@@ -4,13 +4,18 @@
|
|||||||
* Wrapper around SSEContext that collects and manages events.
|
* Wrapper around SSEContext that collects and manages events.
|
||||||
* Provides a clean interface for subscribing to SSE events with channel filtering.
|
* Provides a clean interface for subscribing to SSE events with channel filtering.
|
||||||
*
|
*
|
||||||
|
* Features:
|
||||||
|
* - Event batching to prevent race conditions when multiple events arrive rapidly
|
||||||
|
* - Deduplication by event ID
|
||||||
|
* - Configurable channel filtering
|
||||||
|
*
|
||||||
* Examples:
|
* Examples:
|
||||||
* const { events } = useSSE(); // All events
|
* const { events } = useSSE(); // All events
|
||||||
* const { events } = useSSE({ channels: ['inventory.alerts'] });
|
* const { events } = useSSE({ channels: ['inventory.alerts'] });
|
||||||
* const { events } = useSSE({ channels: ['*.notifications'] });
|
* const { events } = useSSE({ channels: ['*.notifications'] });
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { useContext, useEffect, useState, useCallback } from 'react';
|
import { useContext, useEffect, useState, useCallback, useRef, useMemo } from 'react';
|
||||||
import { SSEContext } from '../contexts/SSEContext';
|
import { SSEContext } from '../contexts/SSEContext';
|
||||||
import type { Event, Alert, Notification, Recommendation } from '../api/types/events';
|
import type { Event, Alert, Notification, Recommendation } from '../api/types/events';
|
||||||
import { convertLegacyAlert } from '../api/types/events';
|
import { convertLegacyAlert } from '../api/types/events';
|
||||||
@@ -26,162 +31,172 @@ interface UseSSEReturn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const MAX_EVENTS = 200; // Keep last 200 events in memory
|
const MAX_EVENTS = 200; // Keep last 200 events in memory
|
||||||
|
const BATCH_DELAY_MS = 50; // Batch events for 50ms to prevent race conditions
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Normalize event data to consistent Event format
|
||||||
|
*/
|
||||||
|
function normalizeEvent(eventType: string, data: any): Event {
|
||||||
|
// Check if it's new format (has event_class) or legacy format
|
||||||
|
if (data.event_class === 'alert' || data.event_class === 'notification' || data.event_class === 'recommendation') {
|
||||||
|
// Ensure event_type is set even for new format events
|
||||||
|
return {
|
||||||
|
...data,
|
||||||
|
event_type: data.event_type || data.type || eventType,
|
||||||
|
} as Event;
|
||||||
|
} else if (data.item_type === 'alert' || data.item_type === 'recommendation') {
|
||||||
|
return convertLegacyAlert(data);
|
||||||
|
} else if (eventType === 'recommendation') {
|
||||||
|
return {
|
||||||
|
...data,
|
||||||
|
event_class: 'recommendation',
|
||||||
|
event_domain: data.event_domain || 'ai_insights',
|
||||||
|
event_type: data.event_type || data.type || 'recommendation',
|
||||||
|
} as Recommendation;
|
||||||
|
} else if (eventType === 'notification') {
|
||||||
|
return {
|
||||||
|
...data,
|
||||||
|
event_type: data.event_type || data.type || 'notification',
|
||||||
|
} as Notification;
|
||||||
|
} else {
|
||||||
|
return {
|
||||||
|
...data,
|
||||||
|
event_class: 'alert',
|
||||||
|
event_domain: data.event_domain || 'operations',
|
||||||
|
event_type: data.event_type || data.type || 'alert',
|
||||||
|
} as Alert;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deduplicate events by ID, keeping the newest version
|
||||||
|
*/
|
||||||
|
function deduplicateEvents(events: Event[]): Event[] {
|
||||||
|
const seen = new Map<string, Event>();
|
||||||
|
for (const event of events) {
|
||||||
|
if (!seen.has(event.id)) {
|
||||||
|
seen.set(event.id, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Array.from(seen.values());
|
||||||
|
}
|
||||||
|
|
||||||
export function useSSEEvents(config: UseSSEConfig = {}): UseSSEReturn {
|
export function useSSEEvents(config: UseSSEConfig = {}): UseSSEReturn {
|
||||||
const context = useContext(SSEContext);
|
const context = useContext(SSEContext);
|
||||||
const [events, setEvents] = useState<Event[]>([]);
|
const [events, setEvents] = useState<Event[]>([]);
|
||||||
|
|
||||||
|
// Refs for batching - these persist across renders without causing re-renders
|
||||||
|
const eventBufferRef = useRef<Event[]>([]);
|
||||||
|
const flushTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||||
|
const processedIdsRef = useRef<Set<string>>(new Set());
|
||||||
|
|
||||||
if (!context) {
|
if (!context) {
|
||||||
throw new Error('useSSE must be used within SSEProvider');
|
throw new Error('useSSE must be used within SSEProvider');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a stable key for the config channels to avoid unnecessary re-renders
|
// Memoize channels key to prevent unnecessary effect re-runs
|
||||||
// Use JSON.stringify for reliable comparison of channel arrays
|
const channelsKey = useMemo(() =>
|
||||||
const channelsKey = JSON.stringify(config.channels || []);
|
JSON.stringify(config.channels?.slice().sort() || []),
|
||||||
|
[config.channels]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Flush buffered events to state - batches multiple events into single state update
|
||||||
|
// IMPORTANT: No dependencies - this function's identity must be stable
|
||||||
|
const flushEvents = useCallback(() => {
|
||||||
|
if (eventBufferRef.current.length === 0) return;
|
||||||
|
|
||||||
|
const bufferedEvents = eventBufferRef.current;
|
||||||
|
eventBufferRef.current = [];
|
||||||
|
|
||||||
|
setEvents(prev => {
|
||||||
|
// Combine buffered events with existing events
|
||||||
|
const combined = [...bufferedEvents, ...prev];
|
||||||
|
// Deduplicate and limit
|
||||||
|
const deduplicated = deduplicateEvents(combined);
|
||||||
|
return deduplicated.slice(0, MAX_EVENTS);
|
||||||
|
});
|
||||||
|
}, []); // No dependencies - stable identity
|
||||||
|
|
||||||
|
// Add event to buffer and schedule flush
|
||||||
|
// IMPORTANT: Only depends on flushEvents which is now stable
|
||||||
|
const bufferEvent = useCallback((event: Event) => {
|
||||||
|
// Skip if already processed recently (extra safety)
|
||||||
|
if (processedIdsRef.current.has(event.id)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as processed
|
||||||
|
processedIdsRef.current.add(event.id);
|
||||||
|
|
||||||
|
// Limit processed IDs cache size
|
||||||
|
if (processedIdsRef.current.size > 1000) {
|
||||||
|
const firstId = Array.from(processedIdsRef.current)[0];
|
||||||
|
processedIdsRef.current.delete(firstId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to buffer
|
||||||
|
eventBufferRef.current.push(event);
|
||||||
|
|
||||||
|
// Clear existing timeout and set new one
|
||||||
|
if (flushTimeoutRef.current) {
|
||||||
|
clearTimeout(flushTimeoutRef.current);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush after delay to batch rapid events
|
||||||
|
flushTimeoutRef.current = setTimeout(flushEvents, BATCH_DELAY_MS);
|
||||||
|
}, [flushEvents]); // flushEvents is now stable (no deps)
|
||||||
|
|
||||||
|
// Extract addEventListener to avoid re-running effect when other context values change
|
||||||
|
const { addEventListener } = context;
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const unsubscribers: (() => void)[] = [];
|
const unsubscribers: (() => void)[] = [];
|
||||||
|
|
||||||
// Listen to 'alert' events (can be Alert or legacy format)
|
// Unified event handler for all event types
|
||||||
const handleAlert = (data: any) => {
|
const createHandler = (eventType: string) => (data: any) => {
|
||||||
console.log('🟢 [useSSE] handleAlert triggered', { data });
|
const event = normalizeEvent(eventType, data);
|
||||||
let event: Event;
|
bufferEvent(event);
|
||||||
|
|
||||||
// Check if it's new format (has event_class) or legacy format
|
|
||||||
if (data.event_class === 'alert' || data.event_class === 'notification' || data.event_class === 'recommendation') {
|
|
||||||
// New format
|
|
||||||
event = data as Event;
|
|
||||||
} else if (data.item_type === 'alert' || data.item_type === 'recommendation') {
|
|
||||||
// Legacy format - convert
|
|
||||||
event = convertLegacyAlert(data);
|
|
||||||
} else {
|
|
||||||
// Assume it's an alert if no clear classification
|
|
||||||
event = { ...data, event_class: 'alert', event_domain: 'operations' } as Alert;
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log('🟢 [useSSE] Setting events state with new alert', {
|
|
||||||
eventId: event.id,
|
|
||||||
eventClass: event.event_class,
|
|
||||||
eventDomain: event.event_domain,
|
|
||||||
});
|
|
||||||
|
|
||||||
setEvents(prev => {
|
|
||||||
// Check if this event already exists to prevent duplicate processing
|
|
||||||
const existingIndex = prev.findIndex(e => e.id === event.id);
|
|
||||||
if (existingIndex !== -1) {
|
|
||||||
// Update existing event instead of adding duplicate
|
|
||||||
const newEvents = [...prev];
|
|
||||||
newEvents[existingIndex] = event;
|
|
||||||
return newEvents.slice(0, MAX_EVENTS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add new event if not duplicate
|
|
||||||
const filtered = prev.filter(e => e.id !== event.id);
|
|
||||||
const newEvents = [event, ...filtered].slice(0, MAX_EVENTS);
|
|
||||||
console.log('🟢 [useSSE] Events array updated', {
|
|
||||||
prevCount: prev.length,
|
|
||||||
newCount: newEvents.length,
|
|
||||||
newEventIds: newEvents.map(e => e.id).join(','),
|
|
||||||
});
|
|
||||||
return newEvents;
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
unsubscribers.push(context.addEventListener('alert', handleAlert));
|
// Subscribe to all event types
|
||||||
|
unsubscribers.push(addEventListener('alert', createHandler('alert')));
|
||||||
// Listen to 'notification' events
|
unsubscribers.push(addEventListener('notification', createHandler('notification')));
|
||||||
const handleNotification = (data: Notification) => {
|
unsubscribers.push(addEventListener('recommendation', createHandler('recommendation')));
|
||||||
setEvents(prev => {
|
|
||||||
// Check if this notification already exists to prevent duplicate processing
|
|
||||||
const existingIndex = prev.findIndex(e => e.id === data.id);
|
|
||||||
if (existingIndex !== -1) {
|
|
||||||
// Update existing notification instead of adding duplicate
|
|
||||||
const newEvents = [...prev];
|
|
||||||
newEvents[existingIndex] = data;
|
|
||||||
return newEvents.slice(0, MAX_EVENTS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add new notification if not duplicate
|
|
||||||
const filtered = prev.filter(e => e.id !== data.id);
|
|
||||||
return [data, ...filtered].slice(0, MAX_EVENTS);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
unsubscribers.push(context.addEventListener('notification', handleNotification));
|
|
||||||
|
|
||||||
// Listen to 'recommendation' events
|
|
||||||
const handleRecommendation = (data: any) => {
|
|
||||||
let event: Recommendation;
|
|
||||||
|
|
||||||
// Handle both new and legacy formats
|
|
||||||
if (data.event_class === 'recommendation') {
|
|
||||||
event = data as Recommendation;
|
|
||||||
} else if (data.item_type === 'recommendation') {
|
|
||||||
event = convertLegacyAlert(data) as Recommendation;
|
|
||||||
} else {
|
|
||||||
event = { ...data, event_class: 'recommendation', event_domain: 'operations' } as Recommendation;
|
|
||||||
}
|
|
||||||
|
|
||||||
setEvents(prev => {
|
|
||||||
// Check if this recommendation already exists to prevent duplicate processing
|
|
||||||
const existingIndex = prev.findIndex(e => e.id === event.id);
|
|
||||||
if (existingIndex !== -1) {
|
|
||||||
// Update existing recommendation instead of adding duplicate
|
|
||||||
const newEvents = [...prev];
|
|
||||||
newEvents[existingIndex] = event;
|
|
||||||
return newEvents.slice(0, MAX_EVENTS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add new recommendation if not duplicate
|
|
||||||
const filtered = prev.filter(e => e.id !== event.id);
|
|
||||||
return [event, ...filtered].slice(0, MAX_EVENTS);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
unsubscribers.push(context.addEventListener('recommendation', handleRecommendation));
|
|
||||||
|
|
||||||
// Listen to 'initial_state' event (batch load on connection)
|
// Listen to 'initial_state' event (batch load on connection)
|
||||||
const handleInitialState = (data: any) => {
|
const handleInitialState = (data: any) => {
|
||||||
if (Array.isArray(data)) {
|
if (Array.isArray(data)) {
|
||||||
// Convert each event to proper format
|
const initialEvents = data.map(item => normalizeEvent('alert', item));
|
||||||
const initialEvents = data.map(item => {
|
// For initial state, set directly without buffering
|
||||||
if (item.event_class) {
|
setEvents(deduplicateEvents(initialEvents).slice(0, MAX_EVENTS));
|
||||||
return item as Event;
|
// Mark all as processed
|
||||||
} else if (item.item_type) {
|
initialEvents.forEach(e => processedIdsRef.current.add(e.id));
|
||||||
return convertLegacyAlert(item);
|
|
||||||
} else {
|
|
||||||
return { ...item, event_class: 'alert', event_domain: 'operations' } as Event;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
setEvents(initialEvents.slice(0, MAX_EVENTS));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
unsubscribers.push(context.addEventListener('initial_state', handleInitialState));
|
unsubscribers.push(addEventListener('initial_state', handleInitialState));
|
||||||
|
unsubscribers.push(addEventListener('initial_items', handleInitialState));
|
||||||
// Also listen to legacy 'initial_items' event
|
|
||||||
const handleInitialItems = (data: any) => {
|
|
||||||
if (Array.isArray(data)) {
|
|
||||||
const initialEvents = data.map(item => {
|
|
||||||
if (item.event_class) {
|
|
||||||
return item as Event;
|
|
||||||
} else {
|
|
||||||
return convertLegacyAlert(item);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
setEvents(initialEvents.slice(0, MAX_EVENTS));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
unsubscribers.push(context.addEventListener('initial_items', handleInitialItems));
|
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
|
// Cleanup subscriptions only - do NOT clear flush timeout here
|
||||||
|
// The flush timeout should complete even during cleanup
|
||||||
unsubscribers.forEach(unsub => unsub());
|
unsubscribers.forEach(unsub => unsub());
|
||||||
};
|
};
|
||||||
}, [context, channelsKey]); // Fixed: Added channelsKey dependency
|
}, [addEventListener, channelsKey, bufferEvent]);
|
||||||
|
|
||||||
|
// Separate cleanup effect for flush timeout on unmount only
|
||||||
|
useEffect(() => {
|
||||||
|
return () => {
|
||||||
|
if (flushTimeoutRef.current) {
|
||||||
|
clearTimeout(flushTimeoutRef.current);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}, []); // Empty deps = only runs on unmount
|
||||||
|
|
||||||
const clearEvents = useCallback(() => {
|
const clearEvents = useCallback(() => {
|
||||||
setEvents([]);
|
setEvents([]);
|
||||||
|
eventBufferRef.current = [];
|
||||||
|
processedIdsRef.current.clear();
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
* Main dashboard for enterprise parent tenants showing network-wide metrics
|
* Main dashboard for enterprise parent tenants showing network-wide metrics
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import React, { useState, useEffect } from 'react';
|
import React, { useState, useEffect, useRef } from 'react';
|
||||||
import { useNavigate, useParams } from 'react-router-dom';
|
import { useNavigate, useParams } from 'react-router-dom';
|
||||||
import {
|
import {
|
||||||
useNetworkSummary,
|
useNetworkSummary,
|
||||||
@@ -76,11 +76,17 @@ const EnterpriseDashboardPage: React.FC<EnterpriseDashboardPageProps> = ({ tenan
|
|||||||
channels: ['*.alerts', '*.notifications', 'recommendations']
|
channels: ['*.alerts', '*.notifications', 'recommendations']
|
||||||
});
|
});
|
||||||
|
|
||||||
// Invalidate enterprise data on relevant SSE events
|
// Refs for debouncing SSE-triggered invalidations
|
||||||
useEffect(() => {
|
const invalidationTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||||
if (sseEvents.length === 0 || !tenantId) return;
|
const lastEventCountRef = useRef<number>(0);
|
||||||
|
|
||||||
|
// Invalidate enterprise data on relevant SSE events (debounced)
|
||||||
|
useEffect(() => {
|
||||||
|
// Skip if no new events since last check
|
||||||
|
if (sseEvents.length === 0 || !tenantId || sseEvents.length === lastEventCountRef.current) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const latest = sseEvents[0];
|
|
||||||
const relevantEventTypes = [
|
const relevantEventTypes = [
|
||||||
'batch_completed', 'batch_started', 'batch_state_changed',
|
'batch_completed', 'batch_started', 'batch_state_changed',
|
||||||
'delivery_received', 'delivery_overdue', 'delivery_arriving_soon',
|
'delivery_received', 'delivery_overdue', 'delivery_arriving_soon',
|
||||||
@@ -89,29 +95,48 @@ const EnterpriseDashboardPage: React.FC<EnterpriseDashboardPageProps> = ({ tenan
|
|||||||
'network_alert', 'outlet_performance_update', 'distribution_route_update'
|
'network_alert', 'outlet_performance_update', 'distribution_route_update'
|
||||||
];
|
];
|
||||||
|
|
||||||
if (relevantEventTypes.includes(latest.event_type)) {
|
// Check if any event is relevant
|
||||||
// Invalidate all enterprise dashboard queries
|
const hasRelevantEvent = sseEvents.some(event =>
|
||||||
queryClient.invalidateQueries({
|
relevantEventTypes.includes(event.event_type)
|
||||||
queryKey: ['enterprise', 'network-summary', tenantId],
|
);
|
||||||
refetchType: 'active',
|
|
||||||
});
|
if (hasRelevantEvent) {
|
||||||
queryClient.invalidateQueries({
|
// Clear existing timeout to debounce rapid events
|
||||||
queryKey: ['enterprise', 'children-performance', tenantId],
|
if (invalidationTimeoutRef.current) {
|
||||||
refetchType: 'active',
|
clearTimeout(invalidationTimeoutRef.current);
|
||||||
});
|
}
|
||||||
queryClient.invalidateQueries({
|
|
||||||
queryKey: ['enterprise', 'distribution-overview', tenantId],
|
// Debounce the invalidation to prevent multiple rapid refetches
|
||||||
refetchType: 'active',
|
invalidationTimeoutRef.current = setTimeout(() => {
|
||||||
});
|
lastEventCountRef.current = sseEvents.length;
|
||||||
queryClient.invalidateQueries({
|
|
||||||
queryKey: ['enterprise', 'forecast-summary', tenantId],
|
// Invalidate all enterprise dashboard queries in a single batch
|
||||||
refetchType: 'active',
|
queryClient.invalidateQueries({
|
||||||
});
|
queryKey: ['enterprise', 'network-summary', tenantId],
|
||||||
queryClient.invalidateQueries({
|
refetchType: 'active',
|
||||||
queryKey: ['control-panel-data', tenantId],
|
});
|
||||||
refetchType: 'active',
|
queryClient.invalidateQueries({
|
||||||
});
|
queryKey: ['enterprise', 'children-performance', tenantId],
|
||||||
|
refetchType: 'active',
|
||||||
|
});
|
||||||
|
queryClient.invalidateQueries({
|
||||||
|
queryKey: ['enterprise', 'distribution-overview', tenantId],
|
||||||
|
refetchType: 'active',
|
||||||
|
});
|
||||||
|
queryClient.invalidateQueries({
|
||||||
|
queryKey: ['enterprise', 'forecast-summary', tenantId],
|
||||||
|
refetchType: 'active',
|
||||||
|
});
|
||||||
|
// Note: control-panel-data has its own debounced invalidation in useControlPanelData
|
||||||
|
}, 500); // 500ms debounce
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup timeout on unmount or dependency change
|
||||||
|
return () => {
|
||||||
|
if (invalidationTimeoutRef.current) {
|
||||||
|
clearTimeout(invalidationTimeoutRef.current);
|
||||||
|
}
|
||||||
|
};
|
||||||
}, [sseEvents, tenantId, queryClient]);
|
}, [sseEvents, tenantId, queryClient]);
|
||||||
|
|
||||||
// Check if tenantId is available at the start
|
// Check if tenantId is available at the start
|
||||||
|
|||||||
@@ -69,6 +69,19 @@ class EnrichmentOrchestrator:
|
|||||||
metadata=event.metadata
|
metadata=event.metadata
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Fallback: If orchestrator service didn't return context with already_addressed,
|
||||||
|
# check if the event metadata contains orchestrator_context (e.g., from demo seeder)
|
||||||
|
if not orchestrator_context_dict.get("already_addressed"):
|
||||||
|
metadata_context = event.metadata.get("orchestrator_context")
|
||||||
|
if metadata_context and isinstance(metadata_context, dict):
|
||||||
|
# Merge metadata context into orchestrator context
|
||||||
|
orchestrator_context_dict.update(metadata_context)
|
||||||
|
logger.debug(
|
||||||
|
"using_metadata_orchestrator_context",
|
||||||
|
event_type=event.event_type,
|
||||||
|
already_addressed=metadata_context.get("already_addressed")
|
||||||
|
)
|
||||||
|
|
||||||
# Convert to OrchestratorContext if data exists
|
# Convert to OrchestratorContext if data exists
|
||||||
orchestrator_context = None
|
orchestrator_context = None
|
||||||
if orchestrator_context_dict:
|
if orchestrator_context_dict:
|
||||||
@@ -115,7 +128,7 @@ class EnrichmentOrchestrator:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# 9. Determine type class
|
# 9. Determine type class
|
||||||
type_class = self._determine_type_class(orchestrator_context_dict)
|
type_class = self._determine_type_class(orchestrator_context_dict, event.metadata)
|
||||||
|
|
||||||
# 10. Extract AI reasoning from metadata (if present)
|
# 10. Extract AI reasoning from metadata (if present)
|
||||||
reasoning_data = event.metadata.get('reasoning_data')
|
reasoning_data = event.metadata.get('reasoning_data')
|
||||||
@@ -184,13 +197,25 @@ class EnrichmentOrchestrator:
|
|||||||
else:
|
else:
|
||||||
return "info"
|
return "info"
|
||||||
|
|
||||||
def _determine_type_class(self, orchestrator_context: dict) -> str:
|
def _determine_type_class(self, orchestrator_context: dict, metadata: dict = None) -> str:
|
||||||
"""
|
"""
|
||||||
Determine type class based on orchestrator context.
|
Determine type class based on orchestrator context or metadata override.
|
||||||
|
|
||||||
|
Priority order:
|
||||||
|
1. Explicit type_class in metadata (e.g., from demo seeder)
|
||||||
|
2. orchestrator_context.already_addressed = True -> "prevented_issue"
|
||||||
|
3. Default: "action_needed"
|
||||||
|
|
||||||
- prevented_issue: AI already handled it
|
- prevented_issue: AI already handled it
|
||||||
- action_needed: User action required
|
- action_needed: User action required
|
||||||
"""
|
"""
|
||||||
|
# Check for explicit type_class in metadata (allows demo seeder override)
|
||||||
|
if metadata:
|
||||||
|
explicit_type_class = metadata.get("type_class")
|
||||||
|
if explicit_type_class in ("prevented_issue", "action_needed"):
|
||||||
|
return explicit_type_class
|
||||||
|
|
||||||
|
# Determine from orchestrator context
|
||||||
if orchestrator_context and orchestrator_context.get("already_addressed"):
|
if orchestrator_context and orchestrator_context.get("already_addressed"):
|
||||||
return "prevented_issue"
|
return "prevented_issue"
|
||||||
return "action_needed"
|
return "action_needed"
|
||||||
|
|||||||
Reference in New Issue
Block a user