demo seed change
This commit is contained in:
@@ -7,6 +7,7 @@ the enrichment pipeline.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from aio_pika import connect_robust, IncomingMessage, Connection, Channel
|
||||
import structlog
|
||||
|
||||
@@ -112,9 +113,64 @@ class EventConsumer:
|
||||
# Enrich the event
|
||||
enriched_event = await self.enricher.enrich_event(event)
|
||||
|
||||
# Store in database
|
||||
# Check for duplicate alerts before storing
|
||||
async with AsyncSessionLocal() as session:
|
||||
repo = EventRepository(session)
|
||||
|
||||
# Check for duplicate if it's an alert
|
||||
if event.event_class == "alert":
|
||||
from uuid import UUID
|
||||
duplicate_event = await repo.check_duplicate_alert(
|
||||
tenant_id=UUID(event.tenant_id),
|
||||
event_type=event.event_type,
|
||||
entity_links=enriched_event.entity_links,
|
||||
event_metadata=enriched_event.event_metadata,
|
||||
time_window_hours=24 # Check for duplicates in last 24 hours
|
||||
)
|
||||
|
||||
if duplicate_event:
|
||||
logger.info(
|
||||
"Duplicate alert detected, skipping",
|
||||
event_type=event.event_type,
|
||||
tenant_id=event.tenant_id,
|
||||
duplicate_event_id=str(duplicate_event.id)
|
||||
)
|
||||
# Update the existing event's metadata instead of creating a new one
|
||||
# This could include updating delay times, affected orders, etc.
|
||||
duplicate_event.event_metadata = enriched_event.event_metadata
|
||||
duplicate_event.updated_at = datetime.now(timezone.utc)
|
||||
duplicate_event.priority_score = enriched_event.priority_score
|
||||
duplicate_event.priority_level = enriched_event.priority_level
|
||||
|
||||
# Update other relevant fields that might have changed
|
||||
duplicate_event.urgency = enriched_event.urgency.dict() if enriched_event.urgency else None
|
||||
duplicate_event.business_impact = enriched_event.business_impact.dict() if enriched_event.business_impact else None
|
||||
|
||||
await session.commit()
|
||||
await session.refresh(duplicate_event)
|
||||
|
||||
# Send notification for updated event
|
||||
await self._send_notification(duplicate_event)
|
||||
|
||||
# Publish to SSE
|
||||
await self.sse_svc.publish_event(duplicate_event)
|
||||
|
||||
logger.info(
|
||||
"Duplicate alert updated",
|
||||
event_id=str(duplicate_event.id),
|
||||
event_type=event.event_type,
|
||||
priority_level=duplicate_event.priority_level,
|
||||
priority_score=duplicate_event.priority_score
|
||||
)
|
||||
return # Exit early since we handled the duplicate
|
||||
else:
|
||||
logger.info(
|
||||
"New unique alert, proceeding with creation",
|
||||
event_type=event.event_type,
|
||||
tenant_id=event.tenant_id
|
||||
)
|
||||
|
||||
# Store in database (if not a duplicate)
|
||||
stored_event = await repo.create_event(enriched_event)
|
||||
|
||||
# Send to notification service (if alert)
|
||||
|
||||
@@ -148,6 +148,107 @@ class EventRepository:
|
||||
result = await self.session.execute(query)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def check_duplicate_alert(self, tenant_id: UUID, event_type: str, entity_links: Dict, event_metadata: Dict, time_window_hours: int = 24) -> Optional[Event]:
|
||||
"""
|
||||
Check if a similar alert already exists within the time window.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant UUID
|
||||
event_type: Type of event (e.g., 'production_delay', 'critical_stock_shortage')
|
||||
entity_links: Entity references (e.g., batch_id, po_id, ingredient_id)
|
||||
event_metadata: Event metadata for comparison
|
||||
time_window_hours: Time window in hours to check for duplicates
|
||||
|
||||
Returns:
|
||||
Existing event if duplicate found, None otherwise
|
||||
"""
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
# Calculate time threshold
|
||||
time_threshold = datetime.now(timezone.utc) - timedelta(hours=time_window_hours)
|
||||
|
||||
# Build query to find potential duplicates
|
||||
query = select(Event).where(
|
||||
and_(
|
||||
Event.tenant_id == tenant_id,
|
||||
Event.event_type == event_type,
|
||||
Event.status == "active", # Only check active alerts
|
||||
Event.created_at >= time_threshold
|
||||
)
|
||||
)
|
||||
|
||||
result = await self.session.execute(query)
|
||||
potential_duplicates = result.scalars().all()
|
||||
|
||||
# Compare each potential duplicate for semantic similarity
|
||||
for event in potential_duplicates:
|
||||
# Check if entity links match (same batch, PO, ingredient, etc.)
|
||||
if self._entities_match(event.entity_links, entity_links):
|
||||
# For production delays, check if it's the same batch with similar delay
|
||||
if event_type == "production_delay":
|
||||
if self._production_delay_match(event.event_metadata, event_metadata):
|
||||
return event
|
||||
|
||||
# For critical stock shortages, check if it's the same ingredient
|
||||
elif event_type == "critical_stock_shortage":
|
||||
if self._stock_shortage_match(event.event_metadata, event_metadata):
|
||||
return event
|
||||
|
||||
# For delivery overdue alerts, check if it's the same PO
|
||||
elif event_type == "delivery_overdue":
|
||||
if self._delivery_overdue_match(event.event_metadata, event_metadata):
|
||||
return event
|
||||
|
||||
# For general matching based on metadata
|
||||
else:
|
||||
if self._metadata_match(event.event_metadata, event_metadata):
|
||||
return event
|
||||
|
||||
return None
|
||||
|
||||
def _entities_match(self, existing_links: Dict, new_links: Dict) -> bool:
|
||||
"""Check if entity links match between two events."""
|
||||
if not existing_links or not new_links:
|
||||
return False
|
||||
|
||||
# Check for common entity types
|
||||
common_entities = ['production_batch', 'purchase_order', 'ingredient', 'supplier', 'equipment']
|
||||
|
||||
for entity in common_entities:
|
||||
if entity in existing_links and entity in new_links:
|
||||
if existing_links[entity] == new_links[entity]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _production_delay_match(self, existing_meta: Dict, new_meta: Dict) -> bool:
|
||||
"""Check if production delay alerts match."""
|
||||
# Same batch_id indicates same production issue
|
||||
return (existing_meta.get('batch_id') == new_meta.get('batch_id') and
|
||||
existing_meta.get('product_name') == new_meta.get('product_name'))
|
||||
|
||||
def _stock_shortage_match(self, existing_meta: Dict, new_meta: Dict) -> bool:
|
||||
"""Check if stock shortage alerts match."""
|
||||
# Same ingredient_id indicates same shortage issue
|
||||
return existing_meta.get('ingredient_id') == new_meta.get('ingredient_id')
|
||||
|
||||
def _delivery_overdue_match(self, existing_meta: Dict, new_meta: Dict) -> bool:
|
||||
"""Check if delivery overdue alerts match."""
|
||||
# Same PO indicates same delivery issue
|
||||
return existing_meta.get('po_id') == new_meta.get('po_id')
|
||||
|
||||
def _metadata_match(self, existing_meta: Dict, new_meta: Dict) -> bool:
|
||||
"""Generic metadata matching for other alert types."""
|
||||
# Check for common identifying fields
|
||||
common_fields = ['batch_id', 'po_id', 'ingredient_id', 'supplier_id', 'equipment_id']
|
||||
|
||||
for field in common_fields:
|
||||
if field in existing_meta and field in new_meta:
|
||||
if existing_meta[field] == new_meta[field]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def get_summary(self, tenant_id: UUID) -> EventSummary:
|
||||
"""
|
||||
Get summary statistics for dashboard.
|
||||
|
||||
Reference in New Issue
Block a user