Files
bakery-ia/services/alert_processor
2025-12-29 08:11:29 +01:00
..
2025-12-29 08:11:29 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00
2025-12-19 09:28:36 +01:00
2025-12-05 20:07:01 +01:00

Alert Processor Service v2.0

Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.

Overview

The Alert Processor Service receives minimal events from other services (inventory, production, procurement, etc.) and enriches them with:

  • i18n message generation - Parameterized titles and messages for frontend
  • Multi-factor priority scoring - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
  • Business impact analysis - Financial impact, affected orders, customer impact
  • Urgency assessment - Time until consequence, deadlines, escalation
  • User agency analysis - Can user fix? External dependencies? Blockers?
  • AI orchestrator context - Query for AI actions already taken
  • Smart action generation - Contextual buttons with deep links
  • Entity linking - References to related entities (POs, batches, orders)

Architecture

Services → RabbitMQ → [Alert Processor] → PostgreSQL
                            ↓
                      Notification Service
                            ↓
                      Redis (SSE Pub/Sub)
                            ↓
                         Frontend

Enrichment Pipeline

  1. Duplicate Detection: Checks for duplicate alerts within 24-hour window
  2. Message Generator: Creates i18n keys and parameters from metadata
  3. Orchestrator Client: Queries AI orchestrator for context
  4. AI Reasoning Extractor: Extracts AI reasoning details and confidence scores
  5. Business Impact Analyzer: Calculates financial and operational impact
  6. Urgency Analyzer: Assesses time sensitivity and deadlines
  7. User Agency Analyzer: Determines user's ability to act
  8. Priority Scorer: Calculates weighted priority score (0-100)
  9. Type Classifier: Determines if action needed or issue prevented
  10. Smart Action Generator: Creates contextual action buttons
  11. Entity Link Extractor: Maps metadata to entity references

Service Structure

alert_processor_v2/
├── app/
│   ├── main.py                    # FastAPI app + lifecycle
│   ├── core/
│   │   ├── config.py              # Settings
│   │   └── database.py            # Database session management
│   ├── models/
│   │   └── events.py              # SQLAlchemy Event model
│   ├── schemas/
│   │   └── events.py              # Pydantic schemas
│   ├── api/
│   │   ├── alerts.py              # Alert endpoints
│   │   └── sse.py                 # SSE streaming
│   ├── consumer/
│   │   └── event_consumer.py     # RabbitMQ consumer
│   ├── enrichment/
│   │   ├── message_generator.py  # i18n generation
│   │   ├── priority_scorer.py    # Priority calculation
│   │   ├── orchestrator_client.py # AI context
│   │   ├── smart_actions.py      # Action buttons
│   │   ├── business_impact.py    # Impact analysis
│   │   ├── urgency_analyzer.py   # Urgency assessment
│   │   └── user_agency.py        # Agency analysis
│   ├── repositories/
│   │   └── event_repository.py   # Database queries
│   ├── services/
│   │   ├── enrichment_orchestrator.py  # Pipeline coordinator
│   │   └── sse_service.py              # SSE pub/sub
│   └── utils/
│       └── message_templates.py   # Alert type mappings
├── migrations/
│   └── versions/
│       └── 20251205_clean_unified_schema.py
└── requirements.txt

Environment Variables

# Service
SERVICE_NAME=alert-processor
VERSION=2.0.0
DEBUG=false

# Database
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db

# RabbitMQ
RABBITMQ_URL=amqp://guest:guest@localhost/
RABBITMQ_EXCHANGE=events.exchange
RABBITMQ_QUEUE=alert_processor.queue

# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_SSE_PREFIX=alerts

# Orchestrator Service
ORCHESTRATOR_URL=http://orchestrator:8000
ORCHESTRATOR_TIMEOUT=10

# Notification Service
NOTIFICATION_URL=http://notification:8000
NOTIFICATION_TIMEOUT=5

# Cache
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300

Running the Service

Local Development

# Install dependencies
pip install -r requirements.txt

# Run database migrations
alembic upgrade head

# Start service
python -m app.main
# or
uvicorn app.main:app --reload

Docker

docker build -t alert-processor:2.0 .
docker run -p 8000:8000 --env-file .env alert-processor:2.0

API Endpoints

Alert Management

  • GET /api/v1/tenants/{tenant_id}/alerts - List alerts with filters
  • GET /api/v1/tenants/{tenant_id}/alerts/summary - Get dashboard summary
  • GET /api/v1/tenants/{tenant_id}/alerts/{alert_id} - Get single alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge - Acknowledge alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve - Resolve alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss - Dismiss alert

Real-Time Streaming

  • GET /api/v1/sse/alerts/{tenant_id} - SSE stream for real-time alerts

Health Check

  • GET /health - Service health status

Event Flow

1. Service Emits Minimal Event

from shared.messaging.event_publisher import EventPublisher

await publisher.publish_alert(
    tenant_id=tenant_id,
    event_type="critical_stock_shortage",
    event_domain="inventory",
    severity="urgent",
    metadata={
        "ingredient_id": "...",
        "ingredient_name": "Flour",
        "current_stock": 10.5,
        "required_stock": 50.0,
        "shortage_amount": 39.5
    }
)

2. Alert Processor Enriches

  • Checks for duplicates: Searches 24-hour window for similar alerts
  • Generates i18n: alerts.critical_stock_shortage.title with params
  • Queries orchestrator for AI context
  • Extracts AI reasoning and confidence scores (if available)
  • Analyzes business impact: €197.50 financial impact
  • Assesses urgency: 12 hours until consequence
  • Determines user agency: Can create PO, requires supplier
  • Calculates priority: Score 78 → "important"
  • Classifies type: action_needed or prevented_issue
  • Generates smart actions: [Create PO, Call Supplier, Dismiss]
  • Extracts entity links: {ingredient: "..."}

3. Stores Enriched Event

{
  "id": "...",
  "event_type": "critical_stock_shortage",
  "event_domain": "inventory",
  "severity": "urgent",
  "type_class": "action_needed",
  "priority_score": 78,
  "priority_level": "important",
  "confidence_score": 95,
  "i18n": {
    "title_key": "alerts.critical_stock_shortage.title",
    "title_params": {"ingredient_name": "Flour"},
    "message_key": "alerts.critical_stock_shortage.message_generic",
    "message_params": {
      "current_stock_kg": 10.5,
      "required_stock_kg": 50.0
    }
  },
  "business_impact": {...},
  "urgency": {...},
  "user_agency": {...},
  "ai_reasoning_details": {...},
  "orchestrator_context": {...},
  "smart_actions": [...],
  "entity_links": {"ingredient": "..."}
}

4. Sends Notification

Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.

5. Publishes to SSE

Publishes to Redis channel alerts:{tenant_id} for real-time frontend updates.

Priority Scoring Algorithm

Formula: Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)

Business Impact Score (0-100):

  • Financial impact > €1000: +30
  • Affected orders > 10: +15
  • High customer impact: +15
  • Production delay > 4h: +10
  • Revenue loss > €500: +10

Urgency Score (0-100):

  • Time until consequence < 2h: +40
  • Deadline present: +5
  • Can't wait until tomorrow: +10
  • Peak hour relevant: +5

User Agency Score (0-100):

  • User can fix: +30
  • Requires external party: -10
  • Has blockers: -5 per blocker
  • Has workaround: +5

Escalation Boost (up to +30):

  • Pending > 72h: +20
  • Deadline < 6h: +30

Alert Types

See app/utils/message_templates.py for complete list.

Standard Alerts

  • critical_stock_shortage - Urgent stock shortages
  • low_stock_warning - Stock running low
  • production_delay - Production behind schedule
  • equipment_failure - Equipment issues
  • po_approval_needed - Purchase order approval required
  • temperature_breach - Temperature control violations
  • delivery_overdue - Late deliveries
  • expired_products - Product expiration warnings

AI Recommendations

  • ai_yield_prediction - AI-predicted production yields
  • ai_safety_stock_optimization - AI stock level recommendations
  • ai_supplier_recommendation - AI supplier suggestions
  • ai_price_forecast - AI price predictions
  • ai_demand_forecast - AI demand forecasts
  • ai_business_rule - AI-suggested business rules

Database Schema

events table with JSONB enrichment:

  • Core: id, tenant_id, created_at, event_type, event_domain, severity
  • i18n: i18n_title_key, i18n_title_params, i18n_message_key, i18n_message_params
  • Priority: priority_score (0-100), priority_level (critical/important/standard/info)
  • Enrichment: orchestrator_context, business_impact, urgency, user_agency (JSONB)
  • AI Fields: ai_reasoning_details, confidence_score, ai_reasoning_summary_key, ai_reasoning_summary_params
  • Classification: type_class (action_needed/prevented_issue)
  • Actions: smart_actions (JSONB array)
  • Entities: entity_links (JSONB)
  • Status: status (active/acknowledged/resolved/dismissed)
  • Metadata: raw_metadata (JSONB)

Key Features

Duplicate Alert Detection

The service automatically detects and prevents duplicate alerts:

  • 24-hour window: Checks for similar alerts in the past 24 hours
  • Smart matching: Compares tenant_id, event_type, and key metadata fields
  • Update strategy: Updates existing alert instead of creating duplicates
  • Metadata preservation: Keeps enriched data while preventing alert fatigue

Type Classification

Events are classified into two types:

  • action_needed: User action required (default for alerts)
  • prevented_issue: AI already handled the situation (for AI recommendations)

This helps the frontend display appropriate UI and messaging.

AI Reasoning Integration

When AI orchestrator has acted on an event:

  • Extracts complete reasoning data structure
  • Stores confidence scores (0-100)
  • Generates i18n-friendly reasoning summaries
  • Links to orchestrator context for full details

Notification Service Integration

Enriched events are automatically sent to the notification service for delivery via:

  • WhatsApp
  • Email
  • Push notifications
  • SMS

Priority mapping:

  • critical → urgent priority
  • important → high priority
  • standard → medium priority
  • info → low priority

Monitoring

Structured JSON logs with:

  • enrichment_started - Event received
  • duplicate_detected - Duplicate alert found and updated
  • enrichment_completed - Enrichment pipeline finished
  • event_stored - Saved to database
  • notification_sent - Notification queued
  • sse_event_published - Published to SSE stream

Testing

# Run tests
pytest

# Test enrichment pipeline
pytest tests/test_enrichment_orchestrator.py

# Test priority scoring
pytest tests/test_priority_scorer.py

# Test message generation
pytest tests/test_message_generator.py

Migration from v1

See MIGRATION_GUIDE.md for migration steps from old alert_processor.

Key changes:

  • Services send minimal events (no hardcoded messages)
  • All enrichment moved to alert_processor
  • Unified Event table (no separate alert/notification tables)
  • i18n-first architecture
  • Sophisticated multi-factor priority scoring
  • Smart action generation