Files
2025-12-19 09:28:36 +01:00

374 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Alert Processor Service v2.0
Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.
## Overview
The Alert Processor Service receives **minimal events** from other services (inventory, production, procurement, etc.) and enriches them with:
- **i18n message generation** - Parameterized titles and messages for frontend
- **Multi-factor priority scoring** - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
- **Business impact analysis** - Financial impact, affected orders, customer impact
- **Urgency assessment** - Time until consequence, deadlines, escalation
- **User agency analysis** - Can user fix? External dependencies? Blockers?
- **AI orchestrator context** - Query for AI actions already taken
- **Smart action generation** - Contextual buttons with deep links
- **Entity linking** - References to related entities (POs, batches, orders)
## Architecture
```
Services → RabbitMQ → [Alert Processor] → PostgreSQL
Notification Service
Redis (SSE Pub/Sub)
Frontend
```
### Enrichment Pipeline
1. **Duplicate Detection**: Checks for duplicate alerts within 24-hour window
2. **Message Generator**: Creates i18n keys and parameters from metadata
3. **Orchestrator Client**: Queries AI orchestrator for context
4. **AI Reasoning Extractor**: Extracts AI reasoning details and confidence scores
5. **Business Impact Analyzer**: Calculates financial and operational impact
6. **Urgency Analyzer**: Assesses time sensitivity and deadlines
7. **User Agency Analyzer**: Determines user's ability to act
8. **Priority Scorer**: Calculates weighted priority score (0-100)
9. **Type Classifier**: Determines if action needed or issue prevented
10. **Smart Action Generator**: Creates contextual action buttons
11. **Entity Link Extractor**: Maps metadata to entity references
## Service Structure
```
alert_processor_v2/
├── app/
│ ├── main.py # FastAPI app + lifecycle
│ ├── core/
│ │ ├── config.py # Settings
│ │ └── database.py # Database session management
│ ├── models/
│ │ └── events.py # SQLAlchemy Event model
│ ├── schemas/
│ │ └── events.py # Pydantic schemas
│ ├── api/
│ │ ├── alerts.py # Alert endpoints
│ │ └── sse.py # SSE streaming
│ ├── consumer/
│ │ └── event_consumer.py # RabbitMQ consumer
│ ├── enrichment/
│ │ ├── message_generator.py # i18n generation
│ │ ├── priority_scorer.py # Priority calculation
│ │ ├── orchestrator_client.py # AI context
│ │ ├── smart_actions.py # Action buttons
│ │ ├── business_impact.py # Impact analysis
│ │ ├── urgency_analyzer.py # Urgency assessment
│ │ └── user_agency.py # Agency analysis
│ ├── repositories/
│ │ └── event_repository.py # Database queries
│ ├── services/
│ │ ├── enrichment_orchestrator.py # Pipeline coordinator
│ │ └── sse_service.py # SSE pub/sub
│ └── utils/
│ └── message_templates.py # Alert type mappings
├── migrations/
│ └── versions/
│ └── 20251205_clean_unified_schema.py
└── requirements.txt
```
## Environment Variables
```bash
# Service
SERVICE_NAME=alert-processor
VERSION=2.0.0
DEBUG=false
# Database
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# RabbitMQ
RABBITMQ_URL=amqp://guest:guest@localhost/
RABBITMQ_EXCHANGE=events.exchange
RABBITMQ_QUEUE=alert_processor.queue
# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_SSE_PREFIX=alerts
# Orchestrator Service
ORCHESTRATOR_URL=http://orchestrator:8000
ORCHESTRATOR_TIMEOUT=10
# Notification Service
NOTIFICATION_URL=http://notification:8000
NOTIFICATION_TIMEOUT=5
# Cache
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300
```
## Running the Service
### Local Development
```bash
# Install dependencies
pip install -r requirements.txt
# Run database migrations
alembic upgrade head
# Start service
python -m app.main
# or
uvicorn app.main:app --reload
```
### Docker
```bash
docker build -t alert-processor:2.0 .
docker run -p 8000:8000 --env-file .env alert-processor:2.0
```
## API Endpoints
### Alert Management
- `GET /api/v1/tenants/{tenant_id}/alerts` - List alerts with filters
- `GET /api/v1/tenants/{tenant_id}/alerts/summary` - Get dashboard summary
- `GET /api/v1/tenants/{tenant_id}/alerts/{alert_id}` - Get single alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge` - Acknowledge alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve` - Resolve alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss` - Dismiss alert
### Real-Time Streaming
- `GET /api/v1/sse/alerts/{tenant_id}` - SSE stream for real-time alerts
### Health Check
- `GET /health` - Service health status
## Event Flow
### 1. Service Emits Minimal Event
```python
from shared.messaging.event_publisher import EventPublisher
await publisher.publish_alert(
tenant_id=tenant_id,
event_type="critical_stock_shortage",
event_domain="inventory",
severity="urgent",
metadata={
"ingredient_id": "...",
"ingredient_name": "Flour",
"current_stock": 10.5,
"required_stock": 50.0,
"shortage_amount": 39.5
}
)
```
### 2. Alert Processor Enriches
- **Checks for duplicates**: Searches 24-hour window for similar alerts
- Generates i18n: `alerts.critical_stock_shortage.title` with params
- Queries orchestrator for AI context
- Extracts AI reasoning and confidence scores (if available)
- Analyzes business impact: €197.50 financial impact
- Assesses urgency: 12 hours until consequence
- Determines user agency: Can create PO, requires supplier
- Calculates priority: Score 78 → "important"
- Classifies type: `action_needed` or `prevented_issue`
- Generates smart actions: [Create PO, Call Supplier, Dismiss]
- Extracts entity links: `{ingredient: "..."}`
### 3. Stores Enriched Event
```json
{
"id": "...",
"event_type": "critical_stock_shortage",
"event_domain": "inventory",
"severity": "urgent",
"type_class": "action_needed",
"priority_score": 78,
"priority_level": "important",
"confidence_score": 95,
"i18n": {
"title_key": "alerts.critical_stock_shortage.title",
"title_params": {"ingredient_name": "Flour"},
"message_key": "alerts.critical_stock_shortage.message_generic",
"message_params": {
"current_stock_kg": 10.5,
"required_stock_kg": 50.0
}
},
"business_impact": {...},
"urgency": {...},
"user_agency": {...},
"ai_reasoning_details": {...},
"orchestrator_context": {...},
"smart_actions": [...],
"entity_links": {"ingredient": "..."}
}
```
### 4. Sends Notification
Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.
### 5. Publishes to SSE
Publishes to Redis channel `alerts:{tenant_id}` for real-time frontend updates.
## Priority Scoring Algorithm
**Formula**: `Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)`
**Business Impact Score (0-100)**:
- Financial impact > €1000: +30
- Affected orders > 10: +15
- High customer impact: +15
- Production delay > 4h: +10
- Revenue loss > €500: +10
**Urgency Score (0-100)**:
- Time until consequence < 2h: +40
- Deadline present: +5
- Can't wait until tomorrow: +10
- Peak hour relevant: +5
**User Agency Score (0-100)**:
- User can fix: +30
- Requires external party: -10
- Has blockers: -5 per blocker
- Has workaround: +5
**Escalation Boost** (up to +30):
- Pending > 72h: +20
- Deadline < 6h: +30
## Alert Types
See [app/utils/message_templates.py](app/utils/message_templates.py) for complete list.
### Standard Alerts
- `critical_stock_shortage` - Urgent stock shortages
- `low_stock_warning` - Stock running low
- `production_delay` - Production behind schedule
- `equipment_failure` - Equipment issues
- `po_approval_needed` - Purchase order approval required
- `temperature_breach` - Temperature control violations
- `delivery_overdue` - Late deliveries
- `expired_products` - Product expiration warnings
### AI Recommendations
- `ai_yield_prediction` - AI-predicted production yields
- `ai_safety_stock_optimization` - AI stock level recommendations
- `ai_supplier_recommendation` - AI supplier suggestions
- `ai_price_forecast` - AI price predictions
- `ai_demand_forecast` - AI demand forecasts
- `ai_business_rule` - AI-suggested business rules
## Database Schema
**events table** with JSONB enrichment:
- Core: `id`, `tenant_id`, `created_at`, `event_type`, `event_domain`, `severity`
- i18n: `i18n_title_key`, `i18n_title_params`, `i18n_message_key`, `i18n_message_params`
- Priority: `priority_score` (0-100), `priority_level` (critical/important/standard/info)
- Enrichment: `orchestrator_context`, `business_impact`, `urgency`, `user_agency` (JSONB)
- AI Fields: `ai_reasoning_details`, `confidence_score`, `ai_reasoning_summary_key`, `ai_reasoning_summary_params`
- Classification: `type_class` (action_needed/prevented_issue)
- Actions: `smart_actions` (JSONB array)
- Entities: `entity_links` (JSONB)
- Status: `status` (active/acknowledged/resolved/dismissed)
- Metadata: `raw_metadata` (JSONB)
## Key Features
### Duplicate Alert Detection
The service automatically detects and prevents duplicate alerts:
- **24-hour window**: Checks for similar alerts in the past 24 hours
- **Smart matching**: Compares `tenant_id`, `event_type`, and key metadata fields
- **Update strategy**: Updates existing alert instead of creating duplicates
- **Metadata preservation**: Keeps enriched data while preventing alert fatigue
### Type Classification
Events are classified into two types:
- **action_needed**: User action required (default for alerts)
- **prevented_issue**: AI already handled the situation (for AI recommendations)
This helps the frontend display appropriate UI and messaging.
### AI Reasoning Integration
When AI orchestrator has acted on an event:
- Extracts complete reasoning data structure
- Stores confidence scores (0-100)
- Generates i18n-friendly reasoning summaries
- Links to orchestrator context for full details
### Notification Service Integration
Enriched events are automatically sent to the notification service for delivery via:
- WhatsApp
- Email
- Push notifications
- SMS
Priority mapping:
- `critical` urgent priority
- `important` high priority
- `standard` medium priority
- `info` low priority
## Monitoring
Structured JSON logs with:
- `enrichment_started` - Event received
- `duplicate_detected` - Duplicate alert found and updated
- `enrichment_completed` - Enrichment pipeline finished
- `event_stored` - Saved to database
- `notification_sent` - Notification queued
- `sse_event_published` - Published to SSE stream
## Testing
```bash
# Run tests
pytest
# Test enrichment pipeline
pytest tests/test_enrichment_orchestrator.py
# Test priority scoring
pytest tests/test_priority_scorer.py
# Test message generation
pytest tests/test_message_generator.py
```
## Migration from v1
See [MIGRATION_GUIDE.md](MIGRATION_GUIDE.md) for migration steps from old alert_processor.
Key changes:
- Services send minimal events (no hardcoded messages)
- All enrichment moved to alert_processor
- Unified Event table (no separate alert/notification tables)
- i18n-first architecture
- Sophisticated multi-factor priority scoring
- Smart action generation