Files

374 lines
12 KiB
Markdown
Raw Permalink Normal View History

2025-12-05 20:07:01 +01:00
# Alert Processor Service v2.0
2025-12-05 20:07:01 +01:00
Clean, well-structured event processing and alert management system with sophisticated enrichment pipeline.
2025-11-06 14:10:04 +01:00
## Overview
2025-12-05 20:07:01 +01:00
The Alert Processor Service receives **minimal events** from other services (inventory, production, procurement, etc.) and enriches them with:
- **i18n message generation** - Parameterized titles and messages for frontend
- **Multi-factor priority scoring** - Business impact (40%), Urgency (30%), User agency (20%), Confidence (10%)
- **Business impact analysis** - Financial impact, affected orders, customer impact
- **Urgency assessment** - Time until consequence, deadlines, escalation
- **User agency analysis** - Can user fix? External dependencies? Blockers?
- **AI orchestrator context** - Query for AI actions already taken
- **Smart action generation** - Contextual buttons with deep links
- **Entity linking** - References to related entities (POs, batches, orders)
## Architecture
```
Services → RabbitMQ → [Alert Processor] → PostgreSQL
Notification Service
Redis (SSE Pub/Sub)
Frontend
```
### Enrichment Pipeline
2025-12-19 09:28:36 +01:00
1. **Duplicate Detection**: Checks for duplicate alerts within 24-hour window
2. **Message Generator**: Creates i18n keys and parameters from metadata
3. **Orchestrator Client**: Queries AI orchestrator for context
4. **AI Reasoning Extractor**: Extracts AI reasoning details and confidence scores
5. **Business Impact Analyzer**: Calculates financial and operational impact
6. **Urgency Analyzer**: Assesses time sensitivity and deadlines
7. **User Agency Analyzer**: Determines user's ability to act
8. **Priority Scorer**: Calculates weighted priority score (0-100)
9. **Type Classifier**: Determines if action needed or issue prevented
10. **Smart Action Generator**: Creates contextual action buttons
11. **Entity Link Extractor**: Maps metadata to entity references
2025-12-05 20:07:01 +01:00
## Service Structure
```
alert_processor_v2/
├── app/
│ ├── main.py # FastAPI app + lifecycle
│ ├── core/
│ │ ├── config.py # Settings
│ │ └── database.py # Database session management
│ ├── models/
│ │ └── events.py # SQLAlchemy Event model
│ ├── schemas/
│ │ └── events.py # Pydantic schemas
│ ├── api/
│ │ ├── alerts.py # Alert endpoints
│ │ └── sse.py # SSE streaming
│ ├── consumer/
│ │ └── event_consumer.py # RabbitMQ consumer
│ ├── enrichment/
│ │ ├── message_generator.py # i18n generation
│ │ ├── priority_scorer.py # Priority calculation
│ │ ├── orchestrator_client.py # AI context
│ │ ├── smart_actions.py # Action buttons
│ │ ├── business_impact.py # Impact analysis
│ │ ├── urgency_analyzer.py # Urgency assessment
│ │ └── user_agency.py # Agency analysis
│ ├── repositories/
│ │ └── event_repository.py # Database queries
│ ├── services/
│ │ ├── enrichment_orchestrator.py # Pipeline coordinator
│ │ └── sse_service.py # SSE pub/sub
│ └── utils/
│ └── message_templates.py # Alert type mappings
├── migrations/
│ └── versions/
│ └── 20251205_clean_unified_schema.py
└── requirements.txt
```
## Environment Variables
2025-11-06 14:10:04 +01:00
```bash
# Service
SERVICE_NAME=alert-processor
2025-12-05 20:07:01 +01:00
VERSION=2.0.0
DEBUG=false
# Database
2025-12-05 20:07:01 +01:00
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# RabbitMQ
2025-12-05 20:07:01 +01:00
RABBITMQ_URL=amqp://guest:guest@localhost/
RABBITMQ_EXCHANGE=events.exchange
RABBITMQ_QUEUE=alert_processor.queue
2025-12-05 20:07:01 +01:00
# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_SSE_PREFIX=alerts
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
# Orchestrator Service
ORCHESTRATOR_URL=http://orchestrator:8000
ORCHESTRATOR_TIMEOUT=10
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
# Notification Service
NOTIFICATION_URL=http://notification:8000
NOTIFICATION_TIMEOUT=5
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
# Cache
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300
```
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
## Running the Service
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
### Local Development
```bash
# Install dependencies
pip install -r requirements.txt
2025-12-05 20:07:01 +01:00
# Run database migrations
alembic upgrade head
# Start service
python -m app.main
2025-12-05 20:07:01 +01:00
# or
uvicorn app.main:app --reload
2025-11-06 14:10:04 +01:00
```
2025-12-05 20:07:01 +01:00
### Docker
```bash
2025-12-05 20:07:01 +01:00
docker build -t alert-processor:2.0 .
docker run -p 8000:8000 --env-file .env alert-processor:2.0
```
2025-12-05 20:07:01 +01:00
## API Endpoints
2025-12-05 20:07:01 +01:00
### Alert Management
2025-12-05 20:07:01 +01:00
- `GET /api/v1/tenants/{tenant_id}/alerts` - List alerts with filters
- `GET /api/v1/tenants/{tenant_id}/alerts/summary` - Get dashboard summary
- `GET /api/v1/tenants/{tenant_id}/alerts/{alert_id}` - Get single alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/acknowledge` - Acknowledge alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/resolve` - Resolve alert
- `POST /api/v1/tenants/{tenant_id}/alerts/{alert_id}/dismiss` - Dismiss alert
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
### Real-Time Streaming
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
- `GET /api/v1/sse/alerts/{tenant_id}` - SSE stream for real-time alerts
2025-11-06 14:10:04 +01:00
### Health Check
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
- `GET /health` - Service health status
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
## Event Flow
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
### 1. Service Emits Minimal Event
```python
2025-12-05 20:07:01 +01:00
from shared.messaging.event_publisher import EventPublisher
await publisher.publish_alert(
tenant_id=tenant_id,
event_type="critical_stock_shortage",
event_domain="inventory",
severity="urgent",
metadata={
"ingredient_id": "...",
"ingredient_name": "Flour",
"current_stock": 10.5,
"required_stock": 50.0,
"shortage_amount": 39.5
}
)
```
2025-12-05 20:07:01 +01:00
### 2. Alert Processor Enriches
2025-12-19 09:28:36 +01:00
- **Checks for duplicates**: Searches 24-hour window for similar alerts
2025-12-05 20:07:01 +01:00
- Generates i18n: `alerts.critical_stock_shortage.title` with params
- Queries orchestrator for AI context
2025-12-19 09:28:36 +01:00
- Extracts AI reasoning and confidence scores (if available)
2025-12-05 20:07:01 +01:00
- Analyzes business impact: €197.50 financial impact
- Assesses urgency: 12 hours until consequence
- Determines user agency: Can create PO, requires supplier
- Calculates priority: Score 78 → "important"
2025-12-19 09:28:36 +01:00
- Classifies type: `action_needed` or `prevented_issue`
2025-12-05 20:07:01 +01:00
- Generates smart actions: [Create PO, Call Supplier, Dismiss]
- Extracts entity links: `{ingredient: "..."}`
2025-12-05 20:07:01 +01:00
### 3. Stores Enriched Event
```json
{
2025-12-05 20:07:01 +01:00
"id": "...",
"event_type": "critical_stock_shortage",
2025-12-19 09:28:36 +01:00
"event_domain": "inventory",
"severity": "urgent",
"type_class": "action_needed",
2025-12-05 20:07:01 +01:00
"priority_score": 78,
"priority_level": "important",
2025-12-19 09:28:36 +01:00
"confidence_score": 95,
2025-12-05 20:07:01 +01:00
"i18n": {
"title_key": "alerts.critical_stock_shortage.title",
"title_params": {"ingredient_name": "Flour"},
"message_key": "alerts.critical_stock_shortage.message_generic",
"message_params": {
"current_stock_kg": 10.5,
"required_stock_kg": 50.0
}
},
"business_impact": {...},
"urgency": {...},
"user_agency": {...},
2025-12-19 09:28:36 +01:00
"ai_reasoning_details": {...},
"orchestrator_context": {...},
"smart_actions": [...],
"entity_links": {"ingredient": "..."}
}
```
2025-12-05 20:07:01 +01:00
### 4. Sends Notification
2025-12-05 20:07:01 +01:00
Calls notification service with event details for delivery via WhatsApp, Email, Push, etc.
2025-12-05 20:07:01 +01:00
### 5. Publishes to SSE
2025-12-05 20:07:01 +01:00
Publishes to Redis channel `alerts:{tenant_id}` for real-time frontend updates.
2025-12-05 20:07:01 +01:00
## Priority Scoring Algorithm
2025-12-05 20:07:01 +01:00
**Formula**: `Total = (Impact × 0.4) + (Urgency × 0.3) + (Agency × 0.2) + (Confidence × 0.1)`
2025-12-05 20:07:01 +01:00
**Business Impact Score (0-100)**:
- Financial impact > €1000: +30
- Affected orders > 10: +15
- High customer impact: +15
- Production delay > 4h: +10
- Revenue loss > €500: +10
2025-12-05 20:07:01 +01:00
**Urgency Score (0-100)**:
- Time until consequence < 2h: +40
- Deadline present: +5
- Can't wait until tomorrow: +10
- Peak hour relevant: +5
2025-12-05 20:07:01 +01:00
**User Agency Score (0-100)**:
- User can fix: +30
- Requires external party: -10
- Has blockers: -5 per blocker
- Has workaround: +5
2025-12-05 20:07:01 +01:00
**Escalation Boost** (up to +30):
- Pending > 72h: +20
- Deadline < 6h: +30
2025-12-05 20:07:01 +01:00
## Alert Types
2025-12-05 20:07:01 +01:00
See [app/utils/message_templates.py](app/utils/message_templates.py) for complete list.
2025-12-19 09:28:36 +01:00
### Standard Alerts
- `critical_stock_shortage` - Urgent stock shortages
- `low_stock_warning` - Stock running low
- `production_delay` - Production behind schedule
- `equipment_failure` - Equipment issues
- `po_approval_needed` - Purchase order approval required
- `temperature_breach` - Temperature control violations
- `delivery_overdue` - Late deliveries
- `expired_products` - Product expiration warnings
### AI Recommendations
- `ai_yield_prediction` - AI-predicted production yields
- `ai_safety_stock_optimization` - AI stock level recommendations
- `ai_supplier_recommendation` - AI supplier suggestions
- `ai_price_forecast` - AI price predictions
- `ai_demand_forecast` - AI demand forecasts
- `ai_business_rule` - AI-suggested business rules
2025-12-05 20:07:01 +01:00
## Database Schema
2025-12-05 20:07:01 +01:00
**events table** with JSONB enrichment:
2025-12-19 09:28:36 +01:00
- Core: `id`, `tenant_id`, `created_at`, `event_type`, `event_domain`, `severity`
2025-12-05 20:07:01 +01:00
- i18n: `i18n_title_key`, `i18n_title_params`, `i18n_message_key`, `i18n_message_params`
- Priority: `priority_score` (0-100), `priority_level` (critical/important/standard/info)
- Enrichment: `orchestrator_context`, `business_impact`, `urgency`, `user_agency` (JSONB)
2025-12-19 09:28:36 +01:00
- AI Fields: `ai_reasoning_details`, `confidence_score`, `ai_reasoning_summary_key`, `ai_reasoning_summary_params`
- Classification: `type_class` (action_needed/prevented_issue)
2025-12-05 20:07:01 +01:00
- Actions: `smart_actions` (JSONB array)
2025-12-19 09:28:36 +01:00
- Entities: `entity_links` (JSONB)
2025-12-05 20:07:01 +01:00
- Status: `status` (active/acknowledged/resolved/dismissed)
2025-12-19 09:28:36 +01:00
- Metadata: `raw_metadata` (JSONB)
## Key Features
### Duplicate Alert Detection
The service automatically detects and prevents duplicate alerts:
- **24-hour window**: Checks for similar alerts in the past 24 hours
- **Smart matching**: Compares `tenant_id`, `event_type`, and key metadata fields
- **Update strategy**: Updates existing alert instead of creating duplicates
- **Metadata preservation**: Keeps enriched data while preventing alert fatigue
### Type Classification
Events are classified into two types:
- **action_needed**: User action required (default for alerts)
- **prevented_issue**: AI already handled the situation (for AI recommendations)
This helps the frontend display appropriate UI and messaging.
### AI Reasoning Integration
When AI orchestrator has acted on an event:
- Extracts complete reasoning data structure
- Stores confidence scores (0-100)
- Generates i18n-friendly reasoning summaries
- Links to orchestrator context for full details
### Notification Service Integration
Enriched events are automatically sent to the notification service for delivery via:
- WhatsApp
- Email
- Push notifications
- SMS
Priority mapping:
- `critical` → urgent priority
- `important` → high priority
- `standard` → medium priority
- `info` → low priority
2025-12-05 20:07:01 +01:00
## Monitoring
2025-12-05 20:07:01 +01:00
Structured JSON logs with:
- `enrichment_started` - Event received
2025-12-19 09:28:36 +01:00
- `duplicate_detected` - Duplicate alert found and updated
2025-12-05 20:07:01 +01:00
- `enrichment_completed` - Enrichment pipeline finished
- `event_stored` - Saved to database
- `notification_sent` - Notification queued
- `sse_event_published` - Published to SSE stream
2025-12-05 20:07:01 +01:00
## Testing
2025-12-05 20:07:01 +01:00
```bash
# Run tests
pytest
2025-12-05 20:07:01 +01:00
# Test enrichment pipeline
pytest tests/test_enrichment_orchestrator.py
2025-12-05 20:07:01 +01:00
# Test priority scoring
pytest tests/test_priority_scorer.py
2025-12-05 20:07:01 +01:00
# Test message generation
pytest tests/test_message_generator.py
```
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
## Migration from v1
See [MIGRATION_GUIDE.md](MIGRATION_GUIDE.md) for migration steps from old alert_processor.
2025-11-06 14:10:04 +01:00
2025-12-05 20:07:01 +01:00
Key changes:
- Services send minimal events (no hardcoded messages)
- All enrichment moved to alert_processor
- Unified Event table (no separate alert/notification tables)
- i18n-first architecture
- Sophisticated multi-factor priority scoring
- Smart action generation