Files
bakery-ia/services/alert_processor
2025-12-09 10:21:41 +01:00
..
2025-12-09 10:21:41 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00
2025-12-05 20:07:01 +01:00

Alert Processor Service v2.0

Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.

Overview

The Alert Processor Service receives minimal events from other services (inventory, production, procurement, etc.) and enriches them with:

  • i18n message generation - Parameterized titles and messages for frontend
  • Multi-factor priority scoring - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
  • Business impact analysis - Financial impact, affected orders, customer impact
  • Urgency assessment - Time until consequence, deadlines, escalation
  • User agency analysis - Can user fix? External dependencies? Blockers?
  • AI orchestrator context - Query for AI actions already taken
  • Smart action generation - Contextual buttons with deep links
  • Entity linking - References to related entities (POs, batches, orders)

Architecture

Services → RabbitMQ → [Alert Processor] → PostgreSQL
                            ↓
                      Notification Service
                            ↓
                      Redis (SSE Pub/Sub)
                            ↓
                         Frontend

Enrichment Pipeline

  1. Message Generator: Creates i18n keys and parameters from metadata
  2. Orchestrator Client: Queries AI orchestrator for context
  3. Business Impact Analyzer: Calculates financial and operational impact
  4. Urgency Analyzer: Assesses time sensitivity and deadlines
  5. User Agency Analyzer: Determines user's ability to act
  6. Priority Scorer: Calculates weighted priority score (0-100)
  7. Smart Action Generator: Creates contextual action buttons
  8. Entity Link Extractor: Maps metadata to entity references

Service Structure

alert_processor_v2/
├── app/
│   ├── main.py                    # FastAPI app + lifecycle
│   ├── core/
│   │   ├── config.py              # Settings
│   │   └── database.py            # Database session management
│   ├── models/
│   │   └── events.py              # SQLAlchemy Event model
│   ├── schemas/
│   │   └── events.py              # Pydantic schemas
│   ├── api/
│   │   ├── alerts.py              # Alert endpoints
│   │   └── sse.py                 # SSE streaming
│   ├── consumer/
│   │   └── event_consumer.py     # RabbitMQ consumer
│   ├── enrichment/
│   │   ├── message_generator.py  # i18n generation
│   │   ├── priority_scorer.py    # Priority calculation
│   │   ├── orchestrator_client.py # AI context
│   │   ├── smart_actions.py      # Action buttons
│   │   ├── business_impact.py    # Impact analysis
│   │   ├── urgency_analyzer.py   # Urgency assessment
│   │   └── user_agency.py        # Agency analysis
│   ├── repositories/
│   │   └── event_repository.py   # Database queries
│   ├── services/
│   │   ├── enrichment_orchestrator.py  # Pipeline coordinator
│   │   └── sse_service.py              # SSE pub/sub
│   └── utils/
│       └── message_templates.py   # Alert type mappings
├── migrations/
│   └── versions/
│       └── 20251205_clean_unified_schema.py
└── requirements.txt

Environment Variables

# Service
SERVICE_NAME=alert-processor
VERSION=2.0.0
DEBUG=false

# Database
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db

# RabbitMQ
RABBITMQ_URL=amqp://guest:guest@localhost/
RABBITMQ_EXCHANGE=events.exchange
RABBITMQ_QUEUE=alert_processor.queue

# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_SSE_PREFIX=alerts

# Orchestrator Service
ORCHESTRATOR_URL=http://orchestrator:8000
ORCHESTRATOR_TIMEOUT=10

# Notification Service
NOTIFICATION_URL=http://notification:8000
NOTIFICATION_TIMEOUT=5

# Cache
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300

Running the Service

Local Development

# Install dependencies
pip install -r requirements.txt

# Run database migrations
alembic upgrade head

# Start service
python -m app.main
# or
uvicorn app.main:app --reload

Docker

docker build -t alert-processor:2.0 .
docker run -p 8000:8000 --env-file .env alert-processor:2.0

API Endpoints

Alert Management

  • GET /api/v1/tenants/{tenant_id}/alerts - List alerts with filters
  • GET /api/v1/tenants/{tenant_id}/alerts/summary - Get dashboard summary
  • GET /api/v1/tenants/{tenant_id}/alerts/{alert_id} - Get single alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge - Acknowledge alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve - Resolve alert
  • POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss - Dismiss alert

Real-Time Streaming

  • GET /api/v1/sse/alerts/{tenant_id} - SSE stream for real-time alerts

Health Check

  • GET /health - Service health status

Event Flow

1. Service Emits Minimal Event

from shared.messaging.event_publisher import EventPublisher

await publisher.publish_alert(
    tenant_id=tenant_id,
    event_type="critical_stock_shortage",
    event_domain="inventory",
    severity="urgent",
    metadata={
        "ingredient_id": "...",
        "ingredient_name": "Flour",
        "current_stock": 10.5,
        "required_stock": 50.0,
        "shortage_amount": 39.5
    }
)

2. Alert Processor Enriches

  • Generates i18n: alerts.critical_stock_shortage.title with params
  • Queries orchestrator for AI context
  • Analyzes business impact: €197.50 financial impact
  • Assesses urgency: 12 hours until consequence
  • Determines user agency: Can create PO, requires supplier
  • Calculates priority: Score 78 → "important"
  • Generates smart actions: [Create PO, Call Supplier, Dismiss]
  • Extracts entity links: {ingredient: "..."}

3. Stores Enriched Event

{
  "id": "...",
  "event_type": "critical_stock_shortage",
  "priority_score": 78,
  "priority_level": "important",
  "i18n": {
    "title_key": "alerts.critical_stock_shortage.title",
    "title_params": {"ingredient_name": "Flour"},
    "message_key": "alerts.critical_stock_shortage.message_generic",
    "message_params": {
      "current_stock_kg": 10.5,
      "required_stock_kg": 50.0
    }
  },
  "business_impact": {...},
  "urgency": {...},
  "user_agency": {...},
  "smart_actions": [...]
}

4. Sends Notification

Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.

5. Publishes to SSE

Publishes to Redis channel alerts:{tenant_id} for real-time frontend updates.

Priority Scoring Algorithm

Formula: Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)

Business Impact Score (0-100):

  • Financial impact > €1000: +30
  • Affected orders > 10: +15
  • High customer impact: +15
  • Production delay > 4h: +10
  • Revenue loss > €500: +10

Urgency Score (0-100):

  • Time until consequence < 2h: +40
  • Deadline present: +5
  • Can't wait until tomorrow: +10
  • Peak hour relevant: +5

User Agency Score (0-100):

  • User can fix: +30
  • Requires external party: -10
  • Has blockers: -5 per blocker
  • Has workaround: +5

Escalation Boost (up to +30):

  • Pending > 72h: +20
  • Deadline < 6h: +30

Alert Types

See app/utils/message_templates.py for complete list.

Key alert types:

  • critical_stock_shortage
  • low_stock_warning
  • production_delay
  • equipment_failure
  • po_approval_needed
  • temperature_breach
  • delivery_overdue
  • expired_products

Database Schema

events table with JSONB enrichment:

  • Core: id, tenant_id, created_at, event_type
  • i18n: i18n_title_key, i18n_title_params, i18n_message_key, i18n_message_params
  • Priority: priority_score (0-100), priority_level (critical/important/standard/info)
  • Enrichment: orchestrator_context, business_impact, urgency, user_agency (JSONB)
  • Actions: smart_actions (JSONB array)
  • Status: status (active/acknowledged/resolved/dismissed)

Monitoring

Structured JSON logs with:

  • enrichment_started - Event received
  • enrichment_completed - Enrichment pipeline finished
  • event_stored - Saved to database
  • notification_sent - Notification queued
  • sse_event_published - Published to SSE stream

Testing

# Run tests
pytest

# Test enrichment pipeline
pytest tests/test_enrichment_orchestrator.py

# Test priority scoring
pytest tests/test_priority_scorer.py

# Test message generation
pytest tests/test_message_generator.py

Migration from v1

See MIGRATION_GUIDE.md for migration steps from old alert_processor.

Key changes:

  • Services send minimal events (no hardcoded messages)
  • All enrichment moved to alert_processor
  • Unified Event table (no separate alert/notification tables)
  • i18n-first architecture
  • Sophisticated multi-factor priority scoring
  • Smart action generation