8.9 KiB
Alert Processor Service v2.0
Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.
Overview
The Alert Processor Service receives minimal events from other services (inventory, production, procurement, etc.) and enriches them with:
- i18n message generation - Parameterized titles and messages for frontend
- Multi-factor priority scoring - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
- Business impact analysis - Financial impact, affected orders, customer impact
- Urgency assessment - Time until consequence, deadlines, escalation
- User agency analysis - Can user fix? External dependencies? Blockers?
- AI orchestrator context - Query for AI actions already taken
- Smart action generation - Contextual buttons with deep links
- Entity linking - References to related entities (POs, batches, orders)
Architecture
Services → RabbitMQ → [Alert Processor] → PostgreSQL
↓
Notification Service
↓
Redis (SSE Pub/Sub)
↓
Frontend
Enrichment Pipeline
- Message Generator: Creates i18n keys and parameters from metadata
- Orchestrator Client: Queries AI orchestrator for context
- Business Impact Analyzer: Calculates financial and operational impact
- Urgency Analyzer: Assesses time sensitivity and deadlines
- User Agency Analyzer: Determines user's ability to act
- Priority Scorer: Calculates weighted priority score (0-100)
- Smart Action Generator: Creates contextual action buttons
- Entity Link Extractor: Maps metadata to entity references
Service Structure
alert_processor_v2/
├── app/
│ ├── main.py # FastAPI app + lifecycle
│ ├── core/
│ │ ├── config.py # Settings
│ │ └── database.py # Database session management
│ ├── models/
│ │ └── events.py # SQLAlchemy Event model
│ ├── schemas/
│ │ └── events.py # Pydantic schemas
│ ├── api/
│ │ ├── alerts.py # Alert endpoints
│ │ └── sse.py # SSE streaming
│ ├── consumer/
│ │ └── event_consumer.py # RabbitMQ consumer
│ ├── enrichment/
│ │ ├── message_generator.py # i18n generation
│ │ ├── priority_scorer.py # Priority calculation
│ │ ├── orchestrator_client.py # AI context
│ │ ├── smart_actions.py # Action buttons
│ │ ├── business_impact.py # Impact analysis
│ │ ├── urgency_analyzer.py # Urgency assessment
│ │ └── user_agency.py # Agency analysis
│ ├── repositories/
│ │ └── event_repository.py # Database queries
│ ├── services/
│ │ ├── enrichment_orchestrator.py # Pipeline coordinator
│ │ └── sse_service.py # SSE pub/sub
│ └── utils/
│ └── message_templates.py # Alert type mappings
├── migrations/
│ └── versions/
│ └── 20251205_clean_unified_schema.py
└── requirements.txt
Environment Variables
# Service
SERVICE_NAME=alert-processor
VERSION=2.0.0
DEBUG=false
# Database
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# RabbitMQ
RABBITMQ_URL=amqp://guest:guest@localhost/
RABBITMQ_EXCHANGE=events.exchange
RABBITMQ_QUEUE=alert_processor.queue
# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_SSE_PREFIX=alerts
# Orchestrator Service
ORCHESTRATOR_URL=http://orchestrator:8000
ORCHESTRATOR_TIMEOUT=10
# Notification Service
NOTIFICATION_URL=http://notification:8000
NOTIFICATION_TIMEOUT=5
# Cache
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300
Running the Service
Local Development
# Install dependencies
pip install -r requirements.txt
# Run database migrations
alembic upgrade head
# Start service
python -m app.main
# or
uvicorn app.main:app --reload
Docker
docker build -t alert-processor:2.0 .
docker run -p 8000:8000 --env-file .env alert-processor:2.0
API Endpoints
Alert Management
GET /api/v1/tenants/{tenant_id}/alerts- List alerts with filtersGET /api/v1/tenants/{tenant_id}/alerts/summary- Get dashboard summaryGET /api/v1/tenants/{tenant_id}/alerts/{alert_id}- Get single alertPOST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge- Acknowledge alertPOST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve- Resolve alertPOST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss- Dismiss alert
Real-Time Streaming
GET /api/v1/sse/alerts/{tenant_id}- SSE stream for real-time alerts
Health Check
GET /health- Service health status
Event Flow
1. Service Emits Minimal Event
from shared.messaging.event_publisher import EventPublisher
await publisher.publish_alert(
tenant_id=tenant_id,
event_type="critical_stock_shortage",
event_domain="inventory",
severity="urgent",
metadata={
"ingredient_id": "...",
"ingredient_name": "Flour",
"current_stock": 10.5,
"required_stock": 50.0,
"shortage_amount": 39.5
}
)
2. Alert Processor Enriches
- Generates i18n:
alerts.critical_stock_shortage.titlewith params - Queries orchestrator for AI context
- Analyzes business impact: €197.50 financial impact
- Assesses urgency: 12 hours until consequence
- Determines user agency: Can create PO, requires supplier
- Calculates priority: Score 78 → "important"
- Generates smart actions: [Create PO, Call Supplier, Dismiss]
- Extracts entity links:
{ingredient: "..."}
3. Stores Enriched Event
{
"id": "...",
"event_type": "critical_stock_shortage",
"priority_score": 78,
"priority_level": "important",
"i18n": {
"title_key": "alerts.critical_stock_shortage.title",
"title_params": {"ingredient_name": "Flour"},
"message_key": "alerts.critical_stock_shortage.message_generic",
"message_params": {
"current_stock_kg": 10.5,
"required_stock_kg": 50.0
}
},
"business_impact": {...},
"urgency": {...},
"user_agency": {...},
"smart_actions": [...]
}
4. Sends Notification
Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.
5. Publishes to SSE
Publishes to Redis channel alerts:{tenant_id} for real-time frontend updates.
Priority Scoring Algorithm
Formula: Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)
Business Impact Score (0-100):
- Financial impact > €1000: +30
- Affected orders > 10: +15
- High customer impact: +15
- Production delay > 4h: +10
- Revenue loss > €500: +10
Urgency Score (0-100):
- Time until consequence < 2h: +40
- Deadline present: +5
- Can't wait until tomorrow: +10
- Peak hour relevant: +5
User Agency Score (0-100):
- User can fix: +30
- Requires external party: -10
- Has blockers: -5 per blocker
- Has workaround: +5
Escalation Boost (up to +30):
- Pending > 72h: +20
- Deadline < 6h: +30
Alert Types
See app/utils/message_templates.py for complete list.
Key alert types:
critical_stock_shortagelow_stock_warningproduction_delayequipment_failurepo_approval_neededtemperature_breachdelivery_overdueexpired_products
Database Schema
events table with JSONB enrichment:
- Core:
id,tenant_id,created_at,event_type - i18n:
i18n_title_key,i18n_title_params,i18n_message_key,i18n_message_params - Priority:
priority_score(0-100),priority_level(critical/important/standard/info) - Enrichment:
orchestrator_context,business_impact,urgency,user_agency(JSONB) - Actions:
smart_actions(JSONB array) - Status:
status(active/acknowledged/resolved/dismissed)
Monitoring
Structured JSON logs with:
enrichment_started- Event receivedenrichment_completed- Enrichment pipeline finishedevent_stored- Saved to databasenotification_sent- Notification queuedsse_event_published- Published to SSE stream
Testing
# Run tests
pytest
# Test enrichment pipeline
pytest tests/test_enrichment_orchestrator.py
# Test priority scoring
pytest tests/test_priority_scorer.py
# Test message generation
pytest tests/test_message_generator.py
Migration from v1
See MIGRATION_GUIDE.md for migration steps from old alert_processor.
Key changes:
- Services send minimal events (no hardcoded messages)
- All enrichment moved to alert_processor
- Unified Event table (no separate alert/notification tables)
- i18n-first architecture
- Sophisticated multi-factor priority scoring
- Smart action generation