374 lines
12 KiB
Markdown
374 lines
12 KiB
Markdown
# Alert Processor Service v2.0
|
||
|
||
Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.
|
||
|
||
## Overview
|
||
|
||
The Alert Processor Service receives **minimal events** from other services (inventory, production, procurement, etc.) and enriches them with:
|
||
|
||
- **i18n message generation** - Parameterized titles and messages for frontend
|
||
- **Multi-factor priority scoring** - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
|
||
- **Business impact analysis** - Financial impact, affected orders, customer impact
|
||
- **Urgency assessment** - Time until consequence, deadlines, escalation
|
||
- **User agency analysis** - Can user fix? External dependencies? Blockers?
|
||
- **AI orchestrator context** - Query for AI actions already taken
|
||
- **Smart action generation** - Contextual buttons with deep links
|
||
- **Entity linking** - References to related entities (POs, batches, orders)
|
||
|
||
## Architecture
|
||
|
||
```
|
||
Services → RabbitMQ → [Alert Processor] → PostgreSQL
|
||
↓
|
||
Notification Service
|
||
↓
|
||
Redis (SSE Pub/Sub)
|
||
↓
|
||
Frontend
|
||
```
|
||
|
||
### Enrichment Pipeline
|
||
|
||
1. **Duplicate Detection**: Checks for duplicate alerts within 24-hour window
|
||
2. **Message Generator**: Creates i18n keys and parameters from metadata
|
||
3. **Orchestrator Client**: Queries AI orchestrator for context
|
||
4. **AI Reasoning Extractor**: Extracts AI reasoning details and confidence scores
|
||
5. **Business Impact Analyzer**: Calculates financial and operational impact
|
||
6. **Urgency Analyzer**: Assesses time sensitivity and deadlines
|
||
7. **User Agency Analyzer**: Determines user's ability to act
|
||
8. **Priority Scorer**: Calculates weighted priority score (0-100)
|
||
9. **Type Classifier**: Determines if action needed or issue prevented
|
||
10. **Smart Action Generator**: Creates contextual action buttons
|
||
11. **Entity Link Extractor**: Maps metadata to entity references
|
||
|
||
## Service Structure
|
||
|
||
```
|
||
alert_processor_v2/
|
||
├── app/
|
||
│ ├── main.py # FastAPI app + lifecycle
|
||
│ ├── core/
|
||
│ │ ├── config.py # Settings
|
||
│ │ └── database.py # Database session management
|
||
│ ├── models/
|
||
│ │ └── events.py # SQLAlchemy Event model
|
||
│ ├── schemas/
|
||
│ │ └── events.py # Pydantic schemas
|
||
│ ├── api/
|
||
│ │ ├── alerts.py # Alert endpoints
|
||
│ │ └── sse.py # SSE streaming
|
||
│ ├── consumer/
|
||
│ │ └── event_consumer.py # RabbitMQ consumer
|
||
│ ├── enrichment/
|
||
│ │ ├── message_generator.py # i18n generation
|
||
│ │ ├── priority_scorer.py # Priority calculation
|
||
│ │ ├── orchestrator_client.py # AI context
|
||
│ │ ├── smart_actions.py # Action buttons
|
||
│ │ ├── business_impact.py # Impact analysis
|
||
│ │ ├── urgency_analyzer.py # Urgency assessment
|
||
│ │ └── user_agency.py # Agency analysis
|
||
│ ├── repositories/
|
||
│ │ └── event_repository.py # Database queries
|
||
│ ├── services/
|
||
│ │ ├── enrichment_orchestrator.py # Pipeline coordinator
|
||
│ │ └── sse_service.py # SSE pub/sub
|
||
│ └── utils/
|
||
│ └── message_templates.py # Alert type mappings
|
||
├── migrations/
|
||
│ └── versions/
|
||
│ └── 20251205_clean_unified_schema.py
|
||
└── requirements.txt
|
||
```
|
||
|
||
## Environment Variables
|
||
|
||
```bash
|
||
# Service
|
||
SERVICE_NAME=alert-processor
|
||
VERSION=2.0.0
|
||
DEBUG=false
|
||
|
||
# Database
|
||
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
|
||
|
||
# RabbitMQ
|
||
RABBITMQ_URL=amqp://guest:guest@localhost/
|
||
RABBITMQ_EXCHANGE=events.exchange
|
||
RABBITMQ_QUEUE=alert_processor.queue
|
||
|
||
# Redis
|
||
REDIS_URL=redis://localhost:6379/0
|
||
REDIS_SSE_PREFIX=alerts
|
||
|
||
# Orchestrator Service
|
||
ORCHESTRATOR_URL=http://orchestrator:8000
|
||
ORCHESTRATOR_TIMEOUT=10
|
||
|
||
# Notification Service
|
||
NOTIFICATION_URL=http://notification:8000
|
||
NOTIFICATION_TIMEOUT=5
|
||
|
||
# Cache
|
||
CACHE_ENABLED=true
|
||
CACHE_TTL_SECONDS=300
|
||
```
|
||
|
||
## Running the Service
|
||
|
||
### Local Development
|
||
|
||
```bash
|
||
# Install dependencies
|
||
pip install -r requirements.txt
|
||
|
||
# Run database migrations
|
||
alembic upgrade head
|
||
|
||
# Start service
|
||
python -m app.main
|
||
# or
|
||
uvicorn app.main:app --reload
|
||
```
|
||
|
||
### Docker
|
||
|
||
```bash
|
||
docker build -t alert-processor:2.0 .
|
||
docker run -p 8000:8000 --env-file .env alert-processor:2.0
|
||
```
|
||
|
||
## API Endpoints
|
||
|
||
### Alert Management
|
||
|
||
- `GET /api/v1/tenants/{tenant_id}/alerts` - List alerts with filters
|
||
- `GET /api/v1/tenants/{tenant_id}/alerts/summary` - Get dashboard summary
|
||
- `GET /api/v1/tenants/{tenant_id}/alerts/{alert_id}` - Get single alert
|
||
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge` - Acknowledge alert
|
||
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve` - Resolve alert
|
||
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss` - Dismiss alert
|
||
|
||
### Real-Time Streaming
|
||
|
||
- `GET /api/v1/sse/alerts/{tenant_id}` - SSE stream for real-time alerts
|
||
|
||
### Health Check
|
||
|
||
- `GET /health` - Service health status
|
||
|
||
## Event Flow
|
||
|
||
### 1. Service Emits Minimal Event
|
||
|
||
```python
|
||
from shared.messaging.event_publisher import EventPublisher
|
||
|
||
await publisher.publish_alert(
|
||
tenant_id=tenant_id,
|
||
event_type="critical_stock_shortage",
|
||
event_domain="inventory",
|
||
severity="urgent",
|
||
metadata={
|
||
"ingredient_id": "...",
|
||
"ingredient_name": "Flour",
|
||
"current_stock": 10.5,
|
||
"required_stock": 50.0,
|
||
"shortage_amount": 39.5
|
||
}
|
||
)
|
||
```
|
||
|
||
### 2. Alert Processor Enriches
|
||
|
||
- **Checks for duplicates**: Searches 24-hour window for similar alerts
|
||
- Generates i18n: `alerts.critical_stock_shortage.title` with params
|
||
- Queries orchestrator for AI context
|
||
- Extracts AI reasoning and confidence scores (if available)
|
||
- Analyzes business impact: €197.50 financial impact
|
||
- Assesses urgency: 12 hours until consequence
|
||
- Determines user agency: Can create PO, requires supplier
|
||
- Calculates priority: Score 78 → "important"
|
||
- Classifies type: `action_needed` or `prevented_issue`
|
||
- Generates smart actions: [Create PO, Call Supplier, Dismiss]
|
||
- Extracts entity links: `{ingredient: "..."}`
|
||
|
||
### 3. Stores Enriched Event
|
||
|
||
```json
|
||
{
|
||
"id": "...",
|
||
"event_type": "critical_stock_shortage",
|
||
"event_domain": "inventory",
|
||
"severity": "urgent",
|
||
"type_class": "action_needed",
|
||
"priority_score": 78,
|
||
"priority_level": "important",
|
||
"confidence_score": 95,
|
||
"i18n": {
|
||
"title_key": "alerts.critical_stock_shortage.title",
|
||
"title_params": {"ingredient_name": "Flour"},
|
||
"message_key": "alerts.critical_stock_shortage.message_generic",
|
||
"message_params": {
|
||
"current_stock_kg": 10.5,
|
||
"required_stock_kg": 50.0
|
||
}
|
||
},
|
||
"business_impact": {...},
|
||
"urgency": {...},
|
||
"user_agency": {...},
|
||
"ai_reasoning_details": {...},
|
||
"orchestrator_context": {...},
|
||
"smart_actions": [...],
|
||
"entity_links": {"ingredient": "..."}
|
||
}
|
||
```
|
||
|
||
### 4. Sends Notification
|
||
|
||
Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.
|
||
|
||
### 5. Publishes to SSE
|
||
|
||
Publishes to Redis channel `alerts:{tenant_id}` for real-time frontend updates.
|
||
|
||
## Priority Scoring Algorithm
|
||
|
||
**Formula**: `Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)`
|
||
|
||
**Business Impact Score (0-100)**:
|
||
- Financial impact > €1000: +30
|
||
- Affected orders > 10: +15
|
||
- High customer impact: +15
|
||
- Production delay > 4h: +10
|
||
- Revenue loss > €500: +10
|
||
|
||
**Urgency Score (0-100)**:
|
||
- Time until consequence < 2h: +40
|
||
- Deadline present: +5
|
||
- Can't wait until tomorrow: +10
|
||
- Peak hour relevant: +5
|
||
|
||
**User Agency Score (0-100)**:
|
||
- User can fix: +30
|
||
- Requires external party: -10
|
||
- Has blockers: -5 per blocker
|
||
- Has workaround: +5
|
||
|
||
**Escalation Boost** (up to +30):
|
||
- Pending > 72h: +20
|
||
- Deadline < 6h: +30
|
||
|
||
## Alert Types
|
||
|
||
See [app/utils/message_templates.py](app/utils/message_templates.py) for complete list.
|
||
|
||
### Standard Alerts
|
||
- `critical_stock_shortage` - Urgent stock shortages
|
||
- `low_stock_warning` - Stock running low
|
||
- `production_delay` - Production behind schedule
|
||
- `equipment_failure` - Equipment issues
|
||
- `po_approval_needed` - Purchase order approval required
|
||
- `temperature_breach` - Temperature control violations
|
||
- `delivery_overdue` - Late deliveries
|
||
- `expired_products` - Product expiration warnings
|
||
|
||
### AI Recommendations
|
||
- `ai_yield_prediction` - AI-predicted production yields
|
||
- `ai_safety_stock_optimization` - AI stock level recommendations
|
||
- `ai_supplier_recommendation` - AI supplier suggestions
|
||
- `ai_price_forecast` - AI price predictions
|
||
- `ai_demand_forecast` - AI demand forecasts
|
||
- `ai_business_rule` - AI-suggested business rules
|
||
|
||
## Database Schema
|
||
|
||
**events table** with JSONB enrichment:
|
||
- Core: `id`, `tenant_id`, `created_at`, `event_type`, `event_domain`, `severity`
|
||
- i18n: `i18n_title_key`, `i18n_title_params`, `i18n_message_key`, `i18n_message_params`
|
||
- Priority: `priority_score` (0-100), `priority_level` (critical/important/standard/info)
|
||
- Enrichment: `orchestrator_context`, `business_impact`, `urgency`, `user_agency` (JSONB)
|
||
- AI Fields: `ai_reasoning_details`, `confidence_score`, `ai_reasoning_summary_key`, `ai_reasoning_summary_params`
|
||
- Classification: `type_class` (action_needed/prevented_issue)
|
||
- Actions: `smart_actions` (JSONB array)
|
||
- Entities: `entity_links` (JSONB)
|
||
- Status: `status` (active/acknowledged/resolved/dismissed)
|
||
- Metadata: `raw_metadata` (JSONB)
|
||
|
||
## Key Features
|
||
|
||
### Duplicate Alert Detection
|
||
|
||
The service automatically detects and prevents duplicate alerts:
|
||
- **24-hour window**: Checks for similar alerts in the past 24 hours
|
||
- **Smart matching**: Compares `tenant_id`, `event_type`, and key metadata fields
|
||
- **Update strategy**: Updates existing alert instead of creating duplicates
|
||
- **Metadata preservation**: Keeps enriched data while preventing alert fatigue
|
||
|
||
### Type Classification
|
||
|
||
Events are classified into two types:
|
||
- **action_needed**: User action required (default for alerts)
|
||
- **prevented_issue**: AI already handled the situation (for AI recommendations)
|
||
|
||
This helps the frontend display appropriate UI and messaging.
|
||
|
||
### AI Reasoning Integration
|
||
|
||
When AI orchestrator has acted on an event:
|
||
- Extracts complete reasoning data structure
|
||
- Stores confidence scores (0-100)
|
||
- Generates i18n-friendly reasoning summaries
|
||
- Links to orchestrator context for full details
|
||
|
||
### Notification Service Integration
|
||
|
||
Enriched events are automatically sent to the notification service for delivery via:
|
||
- WhatsApp
|
||
- Email
|
||
- Push notifications
|
||
- SMS
|
||
|
||
Priority mapping:
|
||
- `critical` → urgent priority
|
||
- `important` → high priority
|
||
- `standard` → medium priority
|
||
- `info` → low priority
|
||
|
||
## Monitoring
|
||
|
||
Structured JSON logs with:
|
||
- `enrichment_started` - Event received
|
||
- `duplicate_detected` - Duplicate alert found and updated
|
||
- `enrichment_completed` - Enrichment pipeline finished
|
||
- `event_stored` - Saved to database
|
||
- `notification_sent` - Notification queued
|
||
- `sse_event_published` - Published to SSE stream
|
||
|
||
## Testing
|
||
|
||
```bash
|
||
# Run tests
|
||
pytest
|
||
|
||
# Test enrichment pipeline
|
||
pytest tests/test_enrichment_orchestrator.py
|
||
|
||
# Test priority scoring
|
||
pytest tests/test_priority_scorer.py
|
||
|
||
# Test message generation
|
||
pytest tests/test_message_generator.py
|
||
```
|
||
|
||
## Migration from v1
|
||
|
||
See [MIGRATION_GUIDE.md](MIGRATION_GUIDE.md) for migration steps from old alert_processor.
|
||
|
||
Key changes:
|
||
- Services send minimal events (no hardcoded messages)
|
||
- All enrichment moved to alert_processor
|
||
- Unified Event table (no separate alert/notification tables)
|
||
- i18n-first architecture
|
||
- Sophisticated multi-factor priority scoring
|
||
- Smart action generation
|