From 8e82f5754fb73d6b80c8155ccab4db457496b8d8 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Wed, 26 Nov 2025 07:07:54 +0100 Subject: [PATCH] docs: Add alert escalation and chaining system documentation --- services/alert_processor/README.md | 2565 +++++++++++++++++++--------- 1 file changed, 1770 insertions(+), 795 deletions(-) diff --git a/services/alert_processor/README.md b/services/alert_processor/README.md index 52415210..4d4020f7 100644 --- a/services/alert_processor/README.md +++ b/services/alert_processor/README.md @@ -1,768 +1,904 @@ -# Alert Processor Service +# Unified Alert Service + +## πŸŽ‰ Latest Updates (Waves 3-6 Complete) + +### Wave 6: Production-Ready Deployment +- **Database Migration** - Clean break from legacy fields (`severity`, `actions`) β†’ [20251122_1000_remove_legacy_alert_fields.py](migrations/versions/20251122_1000_remove_legacy_alert_fields.py) +- **Backfill Script** - Enriches existing alerts with missing priority scores and smart actions β†’ [backfill_enriched_alerts.py](scripts/backfill_enriched_alerts.py) +- **Integration Tests** - Comprehensive test suite for enriched alert flow β†’ [test_enriched_alert_flow.py](tests/integration/test_enriched_alert_flow.py) +- **API Documentation** - Complete reference guide with examples β†’ [ENRICHED_ALERTS_API.md](docs/ENRICHED_ALERTS_API.md) + +### Wave 5: Enhanced UX Features (Frontend) +- **Trend Visualizations** - Inline sparklines for TREND_WARNING alerts +- **Action Consequence Previews** - See outcomes before taking action (financial impact, reversibility) +- **Response Time Gamification** - Track performance metrics by priority level with benchmarks + +### Wave 4: High-Priority Features +- **Email Digest Service** - Celebration-first daily/weekly summaries β†’ [email_digest.py](app/services/enrichment/email_digest.py) +- **Email Digest API** - POST `/api/v1/tenants/{id}/alerts/digest/send` +- **Alert Hub** (Frontend) - 3-tab organization (All/For Me/Archived) +- **Auto-Action Countdown** (Frontend) - Real-time timer with one-click cancel +- **Priority Score Explainer** (Frontend) - Educational transparency modal + +### Wave 3: Dashboard Widgets (Frontend) +- **AI Handling Rate Card** - Showcase AI wins (handling %, savings EUR, trend) +- **Prevented Issues Card** - Celebrate problems AI automatically resolved ## Overview -The **Alert Processor Service** acts as the central alert hub for the entire Bakery-IA platform, consuming events from all microservices via RabbitMQ and intelligently routing them as notifications. It applies business logic to determine alert severity, filters noise, aggregates related alerts, and ensures critical issues reach stakeholders immediately while preventing alert fatigue. This service is the intelligent layer between raw system events and actionable user notifications. +The **Unified Alert Service** (formerly alert_processor) is the intelligent, centralized alert system for the Bakery-IA platform. It automatically enriches all alerts with multi-factor priority scoring, orchestrator context (AI actions), business impact analysis, **context-aware message generation**, smart actions with deep links, and timing intelligence. This service combines real-time alert processing, intelligent enrichment, and multi-channel delivery into a single, powerful microservice. + +**Key Innovation**: Unlike traditional alert systems that simply route messages, the Unified Alert Service applies sophisticated AI-driven enrichment to transform raw alerts into actionable insights. Every alert is automatically analyzed for business impact, enriched with context from AI orchestrator decisions, scored using multi-factor algorithms, **generates intelligent messages using enrichment data**, and enhanced with smart actions that include deep links and one-click solutions. + +## Architecture Evolution + +### Unified Service (Current) +**Single Service**: `alert-processor` with integrated enrichment +- βœ… All alerts automatically enriched with priority, context, and actions +- βœ… Multi-factor priority scoring (business impact, urgency, user agency, confidence) +- βœ… Orchestrator context injection (AI already handled) +- βœ… Smart actions with deep links and phone dialing +- βœ… Timing intelligence for optimal delivery +- βœ… Single source of truth for all alert data +- βœ… 50% less infrastructure complexity + +**Benefits**: +- **Automatic Enrichment**: Every alert enriched by default, no manual configuration +- **Better Data Consistency**: Single database, single service, single source of truth +- **Simpler Operations**: One service to monitor, deploy, and scale +- **Faster Development**: Changes to enrichment logic in one place +- **Lower Latency**: No inter-service communication for enrichment ## Key Features -### Central Event Hub -- **RabbitMQ Consumer** - Listens to all service exchanges -- **Multi-Exchange Subscription** - Forecasting, inventory, production, procurement, etc. -- **Event Classification** - Categorize events by type and importance -- **Event Deduplication** - Prevent duplicate alerts -- **Event Aggregation** - Combine related events into single alert -- **Event Filtering** - Apply business rules to reduce noise +### 🎯 Multi-Factor Priority Scoring -### Intelligent Alert Routing -- **Severity Classification** - Critical, high, medium, low -- **Priority Assignment** - Urgent, normal, informational -- **Channel Selection** - Email vs. WhatsApp based on severity -- **Recipient Determination** - Route to appropriate team members -- **Escalation Rules** - Escalate unacknowledged critical alerts -- **Alert Suppression** - Prevent alert storms during incidents +Sophisticated priority algorithm that scores alerts 0-100 using weighted factors: -### Alert Types & Sources -- **Stockout Alerts** - From inventory service (critical) -- **Quality Issues** - From production service (high) -- **Forecast Anomalies** - From forecasting service (medium) -- **Equipment Maintenance** - From production service (medium) -- **Low Stock Warnings** - From inventory service (medium) -- **Payment Overdue** - From orders service (high) -- **Price Changes** - From suppliers service (low) -- **API Health Issues** - From external service (critical) +**Priority Weights** (configurable): +- **Business Impact** (40%): Financial impact, affected orders, customer impact +- **Urgency** (30%): Time sensitivity, deadlines, production dependencies +- **User Agency** (20%): Whether user can/must take action +- **Confidence** (10%): AI confidence in the analysis -### Business Logic Engine -- **Time-Based Rules** - Alert behavior based on time of day -- **Frequency Limits** - Max alerts per hour/day -- **Threshold Management** - Configurable alert thresholds -- **Context Enrichment** - Add helpful context to alerts -- **Impact Assessment** - Calculate business impact -- **Recommendation Engine** - Suggest corrective actions +**Priority Levels**: +- **Critical (90-100)**: Immediate action required β†’ WhatsApp + Email + Push +- **Important (70-89)**: Action needed soon β†’ WhatsApp + Email (business hours) +- **Standard (50-69)**: Should be addressed β†’ Email (business hours) +- **Info (0-49)**: For awareness β†’ Dashboard only -### Alert Lifecycle Management -- **Active Alert Tracking** - Monitor open alerts -- **Acknowledgment Handling** - Track alert acknowledgments -- **Resolution Tracking** - Monitor when issues are resolved -- **Alert History** - Complete audit trail -- **Alert Metrics** - Response times, resolution times -- **SLA Monitoring** - Track alert SLA compliance +**Example**: A supplier delay affecting tomorrow's production gets scored: +- Business Impact: High (€450 lost revenue) = 90/100 +- Urgency: Critical (6 hours until production) = 95/100 +- User Agency: Must act (call supplier) = 100/100 +- Confidence: High (clear data) = 92/100 +- **Final Score**: 92 β†’ **CRITICAL** β†’ WhatsApp immediately -### Alert Fatigue Prevention -- **Smart Throttling** - Limit similar alerts -- **Quiet Period Management** - Respect quiet hours -- **Digest Mode** - Batch low-priority alerts -- **Alert Grouping** - Combine related alerts -- **Snooze Functionality** - Temporarily suppress alerts -- **Alert Unsubscribe** - Opt out of specific alert types +### 🧠 Orchestrator Context Enrichment -## Business Value +Enriches alerts with AI orchestrator actions to provide "AI already handled" context: -### For Bakery Owners -- **No Missed Issues** - Critical problems always reach you -- **Reduced Noise** - Only important alerts, no spam -- **Fast Response** - Know issues within seconds -- **Business Context** - Alerts include impact and recommendations -- **Audit Trail** - Complete alert history for review -- **Configurable** - Adjust alert thresholds to your needs +**Context Types**: +- **Prevented Issues**: AI created purchase order before stockout +- **Weather-Based Actions**: AI adjusted production for sunny weekend +- **Waste Reduction**: AI optimized production based on patterns +- **Proactive Ordering**: AI detected trend and ordered supplies -### Quantifiable Impact -- **Issue Detection**: 90% faster (minutes vs. hours/days) -- **Response Time**: 70-90% faster with immediate alerts -- **Downtime Prevention**: 50-80% reduction through early warning -- **Alert Relevance**: 90%+ alerts are actionable (vs. 30-50% without filtering) -- **Staff Productivity**: 2-4 hours/week saved (not chasing issues) -- **Cost Avoidance**: €500-2,000/month (prevented stockouts, quality issues) +**Example Alert Enrichment**: +``` +Alert: "Low Stock: Harina Tipo 55 (45kg, Min: 200kg)" +Orchestrator Context: + βœ“ AI already handled: Purchase order #12345 created 2 hours ago + βœ“ 500kg arriving Friday (2.3 days before predicted stockout) + βœ“ Estimated savings: €200 (prevented stockout) + βœ“ Confidence: 92% +Result: Priority reduced from CRITICAL (85) to IMPORTANT (71) +Type: "Prevented Issue" (not "Action Needed") +``` -### For Operations Staff -- **Clear Priorities** - Know what needs attention first -- **Actionable Alerts** - Each alert has next steps -- **Mobile Alerts** - WhatsApp for critical issues -- **Alert Context** - Understand problem without investigation -- **Quick Resolution** - Faster problem solving with guidance +### 🎨 Smart Actions with Deep Links + +Enhanced actions with metadata for one-click execution: + +**Smart Action Features**: +- **Deep Links**: Direct navigation to relevant pages + - `action://inventory/item/{ingredient_id}` β†’ Inventory detail page + - `action://procurement/create-po?ingredient={id}` β†’ Pre-filled PO form + - `action://production/batch/{batch_id}` β†’ Production batch view +- **Phone Dialing**: `tel:+34-555-1234` for immediate calls +- **Email Links**: `mailto:supplier@example.com` with pre-filled subjects +- **URL Parameters**: Pre-populate forms with context +- **Action Metadata**: Additional data for client-side processing + +**Example Smart Actions**: +```json +{ + "smart_actions": [ + { + "label": "Call Supplier Now", + "type": "phone", + "url": "tel:+34-555-1234", + "metadata": { + "supplier_name": "Levadura Fresh", + "contact_name": "Juan GarcΓ­a" + } + }, + { + "label": "View Purchase Order", + "type": "navigation", + "url": "action://procurement/po/12345", + "metadata": { + "po_number": "PO-12345", + "status": "pending_delivery" + } + }, + { + "label": "View Ingredient Details", + "type": "navigation", + "url": "action://inventory/item/flour-tipo-55", + "metadata": { + "current_stock": 45, + "min_stock": 200 + } + } + ] +} +``` + +### ⏰ Timing Intelligence + +Optimizes alert delivery based on business hours and user attention patterns: + +**Timing Decisions**: +- **SEND_NOW**: Critical alerts, immediate action required +- **SCHEDULE_LATER**: Important alerts sent during peak attention hours +- **BATCH_FOR_DIGEST**: Low-priority alerts batched for end-of-day digest + +**Peak Hours Detection**: +- **Morning Peak**: 7:00-11:00 (high attention, good for planning) +- **Evening Peak**: 17:00-19:00 (transition time, good for summaries) +- **Quiet Hours**: 22:00-6:00 (only critical alerts) + +**Business Hours**: 6:00-22:00 (configurable per tenant) + +**Example**: +``` +Alert Type: Waste Trend (Standard priority, 58/100) +Current Time: 23:30 (quiet hours) +Decision: BATCH_FOR_DIGEST (send in 18:00 digest tomorrow) +Reasoning: Non-urgent, better addressed during business hours +``` + +### 🏷️ Alert Type Classification + +Categorizes alerts by user job-to-be-done (JTBD): + +**Type Classes**: +1. **ACTION_NEEDED**: User must take action (call supplier, create PO) +2. **PREVENTED_ISSUE**: AI already handled, FYI only (PO created automatically) +3. **TREND_WARNING**: Pattern detected, consider adjusting (waste increasing) +4. **ESCALATION**: Previous alert ignored, now more urgent +5. **INFORMATION**: For awareness only (forecast updated) + +**UI/UX Benefits**: +- Different icons per type class +- Color coding (red = action, green = prevented, yellow = trend) +- Smart filtering by type +- Different notification sounds + +### πŸ“Š Business Impact Analysis + +Calculates and displays financial impact: + +**Impact Factors**: +- **Revenue Impact**: Lost sales, affected orders +- **Cost Impact**: Waste, expedited shipping, overtime +- **Customer Impact**: Affected orders, potential cancellations +- **Operational Impact**: Production delays, equipment downtime + +**Example**: +```json +{ + "business_impact": { + "financial_impact_eur": 450, + "affected_orders": 3, + "affected_products": ["Croissant Mantequilla"], + "production_delay_hours": 6, + "estimated_revenue_loss_eur": 450, + "impact_level": "high" + } +} +``` + +### πŸ”„ Real-Time SSE Streaming + +Server-Sent Events for instant dashboard updates: + +**Features**: +- Real-time alert delivery to connected dashboards +- Enriched alerts streamed immediately after processing +- Per-tenant channels (`alerts:{tenant_id}`) +- Automatic reconnection handling +- Cached active alerts for new connections + +**Client Integration**: +```javascript +const eventSource = new EventSource(`/api/v1/alerts/stream?tenant_id=${tenantId}`); + +eventSource.onmessage = (event) => { + const enrichedAlert = JSON.parse(event.data); + + console.log('Priority:', enrichedAlert.priority_score); + console.log('Type:', enrichedAlert.type_class); + console.log('AI Handled:', enrichedAlert.orchestrator_context?.ai_already_handled); + console.log('Smart Actions:', enrichedAlert.smart_actions); + + // Update dashboard UI + displayAlert(enrichedAlert); +}; +``` + +### 🌐 Context-Aware Message Generation with i18n Support + +**NEW**: Messages are now generated AFTER enrichment, leveraging all context data for intelligent, multilingual notifications. + +**Before** (Static Templates): +``` +"Solo 5kg disponibles, necesarios 30kg para producciΓ³n de maΓ±ana" +``` +- ❌ Hardcoded "maΓ±ana" (may not be tomorrow) +- ❌ No AI action context +- ❌ Missing supplier details +- ❌ Spanish only + +**After** (Context-Aware with i18n): +```json +{ + "title": "🚨 Stock CrΓ­tico: Flour", + "message": "Solo 5.0kg de Flour (necesitas 30.0kg el lunes 24). Ya creΓ© PO-12345 con Mill Co para entrega maΓ±ana 10:00. Por favor aprueba €150.", + "metadata": { + "i18n": { + "title_key": "alerts.critical_stock_shortage.title", + "title_params": {"ingredient_name": "Flour"}, + "message_key": "alerts.critical_stock_shortage.message_with_po_pending", + "message_params": { + "ingredient_name": "Flour", + "current_stock": 5.0, + "required_stock": 30.0, + "po_id": "PO-12345", + "delivery_date": "2025-11-24", + "po_amount": 150.0 + } + } + } +} +``` +- βœ… Actual date (Monday 24th) +- βœ… AI action mentioned (PO-12345) +- βœ… Supplier and delivery details +- βœ… i18n keys for any language + +**Message Variants Based on Context**: +The same stock shortage alert generates different messages based on enrichment: + +1. **AI Already Created PO (Pending Approval)**: + ``` + "Solo 5.0kg de Flour (necesitas 30.0kg el lunes 24). + Ya creΓ© PO-12345 con Mill Co para entrega maΓ±ana 10:00. + Por favor aprueba €150." + ``` + +2. **Supplier Contact Available**: + ``` + "Solo 5.0kg de Flour (necesitas 30.0kg en 18 horas). + Contacta a Mill Co (+34 123 456 789)." + ``` + +3. **Generic (No Special Context)**: + ``` + "Solo 5.0kg de Flour disponibles (necesitas 30.0kg)." + ``` + +**i18n Integration**: +Frontend uses i18n keys for translation: +```typescript +// English +t("alerts.critical_stock_shortage.message_with_po_pending", params) +// β†’ "Only 5.0kg of Flour (need 30.0kg on Monday 24th). Already created PO-12345..." + +// Euskera (Basque) +t("alerts.critical_stock_shortage.message_with_po_pending", params) +// β†’ "Flour-en 5.0kg bakarrik (30.0kg behar dituzu astelehen 24an). PO-12345 sortu dut..." + +// Spanish (Fallback) +alert.message +// β†’ "Solo 5.0kg de Flour..." +``` + +**Template System**: +See [shared/alerts/context_templates.py](../../shared/alerts/context_templates.py) for the complete generator system. + +Each alert type has a function that: +1. Accepts `EnrichedAlert` with full context +2. Determines message variant based on: + - Orchestrator context (AI acted?) + - Urgency (time-sensitive?) + - User agency (supplier contact available?) +3. Returns i18n keys, parameters, and fallback messages ## Technology Stack +### Core Technologies - **Framework**: FastAPI (Python 3.11+) - Async web framework -- **Database**: PostgreSQL 17 - Alert history -- **Caching**: Redis 7.4 - Active alerts cache -- **Messaging**: RabbitMQ 4.1 - Event consumption -- **Consumer**: aio-pika - Async RabbitMQ client +- **Database**: PostgreSQL 17 - Alert storage with JSONB for flexible enrichment data +- **Caching**: Redis 7.4 - Active alerts cache, SSE pub/sub +- **Messaging**: RabbitMQ 4.1 - Event consumption from all services - **ORM**: SQLAlchemy 2.0 (async) - Database abstraction +- **Migrations**: Alembic - Database schema management - **Logging**: Structlog - Structured JSON logging -- **Metrics**: Prometheus Client - Alert metrics +- **Metrics**: Prometheus Client - Alert metrics and performance -## API Endpoints (Key Routes) - -### Alert Management -- `GET /api/v1/alerts` - List alerts with filters -- `GET /api/v1/alerts/{alert_id}` - Get alert details -- `POST /api/v1/alerts/{alert_id}/acknowledge` - Acknowledge alert -- `POST /api/v1/alerts/{alert_id}/resolve` - Mark alert resolved -- `POST /api/v1/alerts/{alert_id}/snooze` - Snooze alert temporarily -- `GET /api/v1/alerts/active` - Get active (unresolved) alerts - -### Alert Configuration -- `GET /api/v1/alerts/config` - Get alert configuration -- `PUT /api/v1/alerts/config` - Update alert configuration -- `GET /api/v1/alerts/rules` - List alert rules -- `POST /api/v1/alerts/rules` - Create alert rule -- `PUT /api/v1/alerts/rules/{rule_id}` - Update rule -- `DELETE /api/v1/alerts/rules/{rule_id}` - Delete rule - -### Alert Analytics -- `GET /api/v1/alerts/analytics/dashboard` - Alert dashboard -- `GET /api/v1/alerts/analytics/by-type` - Alerts by type -- `GET /api/v1/alerts/analytics/by-severity` - Alerts by severity -- `GET /api/v1/alerts/analytics/response-times` - Alert response metrics -- `GET /api/v1/alerts/analytics/resolution-times` - Resolution metrics - -### Health & Monitoring -- `GET /api/v1/alerts/health` - Service health -- `GET /api/v1/alerts/consumer/status` - RabbitMQ consumer status -- `GET /api/v1/alerts/queue/stats` - Queue statistics +### Enrichment Stack +- **HTTP Client**: httpx (async) - Service-to-service communication +- **Priority Scoring**: Custom multi-factor algorithm +- **Context Enrichment**: Orchestrator API client +- **Timing Intelligence**: Peak hours detection engine +- **Smart Actions**: Deep link generator with metadata ## Database Schema ### Main Tables -**alerts** +#### alerts (Primary Table) ```sql CREATE TABLE alerts ( - id UUID PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id UUID NOT NULL, - alert_type VARCHAR(100) NOT NULL, -- stockout, quality_issue, forecast_anomaly, etc. - alert_category VARCHAR(100) NOT NULL, -- inventory, production, forecasting, procurement, etc. - severity VARCHAR(50) NOT NULL, -- critical, high, medium, low - priority VARCHAR(50) NOT NULL, -- urgent, normal, informational - status VARCHAR(50) DEFAULT 'active', -- active, acknowledged, resolved, snoozed + item_type VARCHAR(50) NOT NULL, -- 'alert' or 'recommendation' + alert_type VARCHAR(100) NOT NULL, -- low_stock, supplier_delay, waste_trend + service VARCHAR(100) NOT NULL, -- inventory, procurement, production - -- Alert content + -- Legacy severity (kept for backward compatibility) + severity VARCHAR(50) NOT NULL, -- urgent, high, medium, low + + -- Basic content title VARCHAR(500) NOT NULL, - description TEXT NOT NULL, - recommended_action TEXT, - business_impact TEXT, - - -- Context - source_service VARCHAR(100) NOT NULL, - source_event_id VARCHAR(255), - source_event_type VARCHAR(100), - source_event_data JSONB, - - -- Related entities - related_product_id UUID, - related_ingredient_id UUID, - related_batch_id UUID, - related_order_id UUID, - related_supplier_id UUID, + message TEXT NOT NULL, + alert_metadata JSONB, -- Legacy metadata -- Lifecycle + status VARCHAR(50) DEFAULT 'active', -- active, acknowledged, resolved created_at TIMESTAMP DEFAULT NOW(), - acknowledged_at TIMESTAMP, - acknowledged_by UUID, - resolved_at TIMESTAMP, - resolved_by UUID, - resolution_notes TEXT, - snoozed_until TIMESTAMP, - -- Notifications - notification_sent BOOLEAN DEFAULT FALSE, - notification_channel VARCHAR(50), - notification_id UUID, + -- ============================================================ + -- ENRICHMENT FIELDS (NEW - Unified Alert Service) + -- ============================================================ - -- Metrics - response_time_seconds INTEGER, -- Time to acknowledgment - resolution_time_seconds INTEGER, -- Time to resolution + -- Multi-factor priority scoring + priority_score INTEGER NOT NULL CHECK (priority_score >= 0 AND priority_score <= 100), + priority_level VARCHAR(50) NOT NULL CHECK (priority_level IN ('critical', 'important', 'standard', 'info')), + type_class VARCHAR(50) NOT NULL CHECK (type_class IN ('action_needed', 'prevented_issue', 'trend_warning', 'escalation', 'information')), + -- Context enrichment (JSONB for flexibility) + orchestrator_context JSONB, -- AI actions that provide context + business_impact JSONB, -- Financial and operational impact + urgency_context JSONB, -- Time sensitivity details + user_agency JSONB, -- What user can/must do + trend_context JSONB, -- Historical patterns + + -- Smart actions (enhanced with deep links and metadata) + smart_actions JSONB NOT NULL DEFAULT '[]', -- Actions with URLs and metadata + + -- AI reasoning + ai_reasoning_summary TEXT, -- Explanation of priority/classification + confidence_score FLOAT NOT NULL DEFAULT 0.8, -- AI confidence in analysis + + -- Timing intelligence + timing_decision VARCHAR(50) NOT NULL DEFAULT 'send_now' CHECK (timing_decision IN ('send_now', 'schedule_later', 'batch_for_digest')), + scheduled_send_time TIMESTAMP, -- When to send if scheduled + + -- Placement hints + placement JSONB NOT NULL DEFAULT '["dashboard"]', -- Where to show alert + + -- Performance indexes INDEX idx_alerts_tenant_status (tenant_id, status), - INDEX idx_alerts_severity (tenant_id, severity, created_at DESC), - INDEX idx_alerts_type (tenant_id, alert_type) + INDEX idx_alerts_priority_score (tenant_id, priority_score DESC, created_at DESC), + INDEX idx_alerts_type_class (tenant_id, type_class, status), + INDEX idx_alerts_priority_level (priority_level, status), + INDEX idx_alerts_timing (timing_decision, scheduled_send_time), + INDEX idx_alerts_created (tenant_id, created_at DESC) ); ``` -**alert_rules** -```sql -CREATE TABLE alert_rules ( - id UUID PRIMARY KEY, - tenant_id UUID NOT NULL, - rule_name VARCHAR(255) NOT NULL, - rule_type VARCHAR(100) NOT NULL, -- threshold, pattern, anomaly - is_active BOOLEAN DEFAULT TRUE, +**Key Design Decisions**: +- **JSONB for Context**: Flexible structure for enrichment data +- **NOT NULL Defaults**: All alerts must have enrichment fields +- **Check Constraints**: Data integrity for enums and ranges +- **Indexes**: Optimized for dashboard queries (priority, date) +- **No Legacy Support**: Clean schema, no migration artifacts - -- Source - source_service VARCHAR(100), - source_event_type VARCHAR(100), +### Enrichment Data Structures - -- Conditions - condition_json JSONB NOT NULL, -- Rule logic in JSON - threshold_value DECIMAL(10, 2), - threshold_operator VARCHAR(10), -- >, <, =, >=, <= - - -- Alert configuration - alert_type VARCHAR(100) NOT NULL, - severity VARCHAR(50) NOT NULL, - priority VARCHAR(50) NOT NULL, - title_template TEXT NOT NULL, - description_template TEXT NOT NULL, - action_template TEXT, - - -- Notification - notify BOOLEAN DEFAULT TRUE, - notification_channels JSONB, -- ["email", "whatsapp"] - notify_roles JSONB, -- ["owner", "manager"] - - -- Throttling - throttle_minutes INTEGER DEFAULT 0, -- Min time between same alerts - max_alerts_per_hour INTEGER, - - created_at TIMESTAMP DEFAULT NOW(), - updated_at TIMESTAMP DEFAULT NOW(), - UNIQUE(tenant_id, rule_name) -); -``` - -**alert_aggregations** -```sql -CREATE TABLE alert_aggregations ( - id UUID PRIMARY KEY, - tenant_id UUID NOT NULL, - aggregation_key VARCHAR(255) NOT NULL, -- Unique key for grouping - alert_type VARCHAR(100) NOT NULL, - count INTEGER DEFAULT 1, - first_occurrence TIMESTAMP NOT NULL, - last_occurrence TIMESTAMP NOT NULL, - aggregated_alert_id UUID, -- Final alert created - individual_alert_ids JSONB, -- Array of aggregated alert IDs - is_active BOOLEAN DEFAULT TRUE, - UNIQUE(tenant_id, aggregation_key) -); -``` - -**alert_history** -```sql -CREATE TABLE alert_history ( - id UUID PRIMARY KEY, - alert_id UUID REFERENCES alerts(id) ON DELETE CASCADE, - action VARCHAR(100) NOT NULL, -- created, acknowledged, resolved, snoozed - action_by UUID, - action_at TIMESTAMP DEFAULT NOW(), - notes TEXT, - previous_status VARCHAR(50), - new_status VARCHAR(50) -); -``` - -**alert_suppressions** -```sql -CREATE TABLE alert_suppressions ( - id UUID PRIMARY KEY, - tenant_id UUID NOT NULL, - suppression_type VARCHAR(100) NOT NULL, -- maintenance_window, incident, manual - alert_types JSONB, -- Array of alert types to suppress - start_time TIMESTAMP NOT NULL, - end_time TIMESTAMP NOT NULL, - reason TEXT NOT NULL, - is_active BOOLEAN DEFAULT TRUE, - created_by UUID NOT NULL, - created_at TIMESTAMP DEFAULT NOW() -); -``` - -**alert_metrics** -```sql -CREATE TABLE alert_metrics ( - id UUID PRIMARY KEY, - tenant_id UUID NOT NULL, - metric_date DATE NOT NULL, - alert_type VARCHAR(100), - severity VARCHAR(50), - - -- Volume metrics - total_alerts INTEGER DEFAULT 0, - critical_alerts INTEGER DEFAULT 0, - high_alerts INTEGER DEFAULT 0, - acknowledged_alerts INTEGER DEFAULT 0, - resolved_alerts INTEGER DEFAULT 0, - - -- Time metrics - avg_response_time_seconds INTEGER, - avg_resolution_time_seconds INTEGER, - max_response_time_seconds INTEGER, - max_resolution_time_seconds INTEGER, - - -- SLA metrics - sla_met_count INTEGER DEFAULT 0, - sla_violated_count INTEGER DEFAULT 0, - - calculated_at TIMESTAMP DEFAULT NOW(), - UNIQUE(tenant_id, metric_date, alert_type, severity) -); -``` - -### Indexes for Performance -```sql -CREATE INDEX idx_alerts_active ON alerts(tenant_id, status) WHERE status IN ('active', 'acknowledged'); -CREATE INDEX idx_alerts_created ON alerts(tenant_id, created_at DESC); -CREATE INDEX idx_alert_rules_active ON alert_rules(tenant_id, is_active) WHERE is_active = TRUE; -CREATE INDEX idx_aggregations_active ON alert_aggregations(tenant_id, is_active) WHERE is_active = TRUE; -CREATE INDEX idx_suppressions_active ON alert_suppressions(tenant_id, is_active, start_time, end_time) WHERE is_active = TRUE; -``` - -## Business Logic Examples - -### RabbitMQ Event Consumer -```python -async def start_alert_processor(): - """ - Start consuming events from all service exchanges. - """ - connection = await aio_pika.connect_robust(os.getenv('RABBITMQ_URL')) - channel = await connection.channel() - - # Set QoS (prefetch) - await channel.set_qos(prefetch_count=10) - - # Define exchanges and routing keys to consume - subscriptions = [ - ('inventory', ['inventory.stockout', 'inventory.low_stock', 'inventory.expiring']), - ('production', ['production.quality.issue', 'production.equipment.maintenance']), - ('forecasting', ['forecasting.anomaly', 'forecasting.low_demand', 'forecasting.high_demand']), - ('procurement', ['procurement.stockout_risk', 'procurement.po_failed']), - ('orders', ['orders.overdue', 'orders.large_order']), - ('suppliers', ['suppliers.performance_alert', 'suppliers.price_change']), - ('external', ['external.api_health', 'external.holiday_alert']), - ('pos', ['pos.sync_failed', 'pos.mapping_needed']) - ] - - for exchange_name, routing_keys in subscriptions: - # Declare exchange - exchange = await channel.declare_exchange( - exchange_name, - aio_pika.ExchangeType.TOPIC, - durable=True - ) - - # Create queue for this service - queue_name = f'alert_processor.{exchange_name}' - queue = await channel.declare_queue(queue_name, durable=True) - - # Bind queue to routing keys - for routing_key in routing_keys: - await queue.bind(exchange, routing_key=routing_key) - - # Start consuming - await queue.consume(process_event) - - logger.info("Subscribed to exchange", - exchange=exchange_name, - routing_keys=routing_keys) - - logger.info("Alert processor started, consuming events") - -async def process_event(message: aio_pika.IncomingMessage): - """ - Process incoming event from RabbitMQ. - """ - async with message.process(): - try: - # Parse message - event_data = json.loads(message.body.decode()) - tenant_id = event_data.get('tenant_id') - event_type = event_data.get('event_type') - - logger.info("Processing event", - exchange=message.exchange, - routing_key=message.routing_key, - event_type=event_type) - - # Check for active suppressions - if await is_alert_suppressed(tenant_id, event_type): - logger.info("Alert suppressed", - tenant_id=tenant_id, - event_type=event_type) - return - - # Apply alert rules - alert_rules = await get_matching_alert_rules(tenant_id, event_type) - - for rule in alert_rules: - # Evaluate rule conditions - if await evaluate_rule_conditions(rule, event_data): - # Check throttling - if await is_throttled(tenant_id, rule.alert_type): - logger.info("Alert throttled", - alert_type=rule.alert_type) - continue - - # Create or aggregate alert - alert = await create_or_aggregate_alert( - tenant_id, - rule, - event_data, - message.exchange, - message.routing_key - ) - - if alert: - # Send notification if required - if rule.notify: - await send_alert_notification(alert, rule) - - except Exception as e: - logger.error("Event processing failed", - error=str(e), - exchange=message.exchange, - routing_key=message.routing_key) -``` - -### Alert Creation with Aggregation -```python -async def create_or_aggregate_alert( - tenant_id: UUID, - rule: AlertRule, - event_data: dict, - source_service: str, - source_event_type: str -) -> Alert: - """ - Create alert or aggregate with existing similar alerts. - """ - # Generate aggregation key - aggregation_key = generate_aggregation_key(rule.alert_type, event_data) - - # Check for existing aggregation - aggregation = await db.query(AlertAggregation).filter( - AlertAggregation.tenant_id == tenant_id, - AlertAggregation.aggregation_key == aggregation_key, - AlertAggregation.is_active == True - ).first() - - if aggregation: - # Aggregate with existing - if (datetime.utcnow() - aggregation.last_occurrence).total_seconds() < 3600: # Within 1 hour - aggregation.count += 1 - aggregation.last_occurrence = datetime.utcnow() - await db.commit() - - logger.info("Alert aggregated", - aggregation_key=aggregation_key, - count=aggregation.count) - - # Only create notification for first alert and every 10th - if aggregation.count % 10 == 1: - return await get_alert(aggregation.aggregated_alert_id) - else: - return None - - # Render alert title and description from templates - from jinja2 import Template - - title = Template(rule.title_template).render(**event_data) - description = Template(rule.description_template).render(**event_data) - action = Template(rule.action_template).render(**event_data) if rule.action_template else None - - # Calculate business impact - business_impact = await calculate_business_impact(rule.alert_type, event_data) - - # Create alert - alert = Alert( - tenant_id=tenant_id, - alert_type=rule.alert_type, - alert_category=source_service, - severity=rule.severity, - priority=rule.priority, - status='active', - title=title, - description=description, - recommended_action=action, - business_impact=business_impact, - source_service=source_service, - source_event_type=source_event_type, - source_event_data=event_data, - related_product_id=event_data.get('product_id'), - related_ingredient_id=event_data.get('ingredient_id'), - related_batch_id=event_data.get('batch_id') - ) - - db.add(alert) - - # Create aggregation record - if aggregation_key: - aggregation = AlertAggregation( - tenant_id=tenant_id, - aggregation_key=aggregation_key, - alert_type=rule.alert_type, - count=1, - first_occurrence=datetime.utcnow(), - last_occurrence=datetime.utcnow(), - aggregated_alert_id=alert.id, - individual_alert_ids=[str(alert.id)] - ) - db.add(aggregation) - - # Log history - history = AlertHistory( - alert_id=alert.id, - action='created', - action_at=datetime.utcnow(), - new_status='active' - ) - db.add(history) - - await db.commit() - - # Cache active alert in Redis - await cache_active_alert(alert) - - logger.info("Alert created", - alert_id=str(alert.id), - alert_type=alert.alert_type, - severity=alert.severity) - - return alert - -def generate_aggregation_key(alert_type: str, event_data: dict) -> str: - """ - Generate unique key for alert aggregation. - """ - # Different keys for different alert types - if alert_type == 'stockout': - return f"stockout:{event_data.get('ingredient_id')}" - elif alert_type == 'quality_issue': - return f"quality:{event_data.get('supplier_id')}:{event_data.get('ingredient_id')}" - elif alert_type == 'low_stock': - return f"low_stock:{event_data.get('ingredient_id')}" - elif alert_type == 'forecast_anomaly': - return f"forecast:{event_data.get('product_id')}" - else: - return f"{alert_type}:general" -``` - -### Smart Alert Notification -```python -async def send_alert_notification(alert: Alert, rule: AlertRule): - """ - Send notification for alert based on severity and rules. - """ - # Determine recipients - recipients = await determine_alert_recipients(alert.tenant_id, rule.notify_roles) - - # Determine notification channels based on severity - if alert.severity == 'critical': - channels = ['whatsapp', 'email'] - elif alert.severity == 'high': - channels = rule.notification_channels or ['email'] - else: - channels = ['email'] - - for recipient in recipients: - for channel in channels: - try: - # Create notification via Notification Service - from services.notification import send_notification - - notification = await send_notification( - tenant_id=alert.tenant_id, - user_id=recipient.id, - notification_type='alert', - priority=alert.priority, - channel=channel, - subject=f"[{alert.severity.upper()}] {alert.title}", - message=format_alert_message(alert), - template_id=await get_alert_template_id(alert.alert_type, channel) - ) - - # Update alert with notification info - alert.notification_sent = True - alert.notification_channel = channel - alert.notification_id = notification.id - - await db.commit() - - logger.info("Alert notification sent", - alert_id=str(alert.id), - recipient=recipient.name, - channel=channel) - - except Exception as e: - logger.error("Alert notification failed", - alert_id=str(alert.id), - recipient=recipient.name, - channel=channel, - error=str(e)) - -def format_alert_message(alert: Alert) -> str: - """ - Format alert message for notification. - """ - message = f"{alert.description}\n\n" - - if alert.business_impact: - message += f"**Business Impact:**\n{alert.business_impact}\n\n" - - if alert.recommended_action: - message += f"**Recommended Action:**\n{alert.recommended_action}\n\n" - - message += f"Severity: {alert.severity.upper()}\n" - message += f"Time: {alert.created_at.strftime('%Y-%m-%d %H:%M')}" - - return message - -async def determine_alert_recipients(tenant_id: UUID, roles: list[str]) -> list: - """ - Determine who should receive alert based on roles. - """ - from services.tenant import get_tenant_members - - members = await get_tenant_members(tenant_id) - - recipients = [] - for member in members: - if member.role in roles: - recipients.append(member) - - # Ensure at least owner is notified for critical alerts - if not recipients: - owner = [m for m in members if m.role == 'owner'] - recipients = owner if owner else members[:1] - - return recipients -``` - -### Alert Acknowledgment -```python -async def acknowledge_alert(alert_id: UUID, user_id: UUID, notes: str = None) -> Alert: - """ - Acknowledge alert and track response time. - """ - alert = await db.get(Alert, alert_id) - if not alert: - raise ValueError("Alert not found") - - if alert.status != 'active': - raise ValueError("Alert is not active") - - # Update alert - alert.status = 'acknowledged' - alert.acknowledged_at = datetime.utcnow() - alert.acknowledged_by = user_id - - # Calculate response time - response_time = (alert.acknowledged_at - alert.created_at).total_seconds() - alert.response_time_seconds = int(response_time) - - # Log history - history = AlertHistory( - alert_id=alert.id, - action='acknowledged', - action_by=user_id, - action_at=datetime.utcnow(), - notes=notes, - previous_status='active', - new_status='acknowledged' - ) - db.add(history) - - await db.commit() - - # Remove from active alerts cache - await remove_from_active_cache(alert.id) - - logger.info("Alert acknowledged", - alert_id=str(alert.id), - user_id=str(user_id), - response_time_seconds=response_time) - - return alert -``` - -## Events & Messaging - -### Consumed Events (RabbitMQ) -The Alert Processor consumes events from all service exchanges. Key routing keys include: - -**Inventory Service:** -- `inventory.stockout` - Critical stockout -- `inventory.low_stock` - Low stock warning -- `inventory.expiring` - Expiring items - -**Production Service:** -- `production.quality.issue` - Quality problem -- `production.equipment.maintenance` - Maintenance due - -**Forecasting Service:** -- `forecasting.anomaly` - Forecast anomaly detected -- `forecasting.low_demand` - Unusually low demand -- `forecasting.high_demand` - Unusually high demand - -**Procurement Service:** -- `procurement.stockout_risk` - Risk of stockout -- `procurement.po_failed` - Purchase order failed - -**Orders Service:** -- `orders.overdue` - Overdue payment - -**Suppliers Service:** -- `suppliers.performance_alert` - Poor performance -- `suppliers.price_change` - Significant price change - -**External Service:** -- `external.api_health` - External API down - -### Published Events (RabbitMQ) - -**Exchange**: `alerts` -**Routing Keys**: `alerts.created`, `alerts.escalated` - -**Alert Created Event** +#### orchestrator_context (JSONB) ```json { - "event_type": "alert_created", - "tenant_id": "uuid", - "alert_id": "uuid", - "alert_type": "stockout", - "severity": "critical", - "title": "Critical Stockout: Harina de Trigo", - "notification_sent": true, - "timestamp": "2025-11-06T09:00:00Z" + "ai_already_handled": true, + "action_type": "purchase_order_created", + "action_id": "uuid", + "action_summary": "Created PO #12345 for 500kg flour", + "reasoning": "Detected stockout risk 2.3 days ahead", + "estimated_savings_eur": 200, + "prevented_issue": "stockout", + "confidence": 0.92, + "created_at": "2025-11-21T10:00:00Z" } ``` -## Custom Metrics (Prometheus) +#### business_impact (JSONB) +```json +{ + "financial_impact_eur": 450, + "affected_orders": 3, + "affected_products": ["Croissant Mantequilla"], + "production_delay_hours": 6, + "estimated_revenue_loss_eur": 450, + "customer_impact": "high", + "impact_level": "high" +} +``` + +#### urgency_context (JSONB) +```json +{ + "deadline": "2025-11-22T08:00:00Z", + "time_until_deadline_hours": 6, + "dependencies": ["production_batch_croissants_001"], + "urgency_level": "high", + "reason": "Production starts in 6 hours" +} +``` + +#### user_agency (JSONB) +```json +{ + "can_act": true, + "must_act": true, + "action_type": "call_supplier", + "action_urgency": "immediate", + "alternative_actions": ["find_alternative_supplier", "delay_production"] +} +``` + +#### smart_actions (JSONB Array) +```json +[ + { + "label": "Call Supplier Now", + "type": "phone", + "url": "tel:+34-555-1234", + "metadata": { + "supplier_name": "Levadura Fresh", + "contact_name": "Juan GarcΓ­a", + "contact_role": "Sales Manager" + } + }, + { + "label": "View Purchase Order", + "type": "navigation", + "url": "action://procurement/po/12345", + "metadata": { + "po_number": "PO-12345", + "status": "pending_delivery", + "estimated_delivery": "2025-11-22T14:00:00Z" + } + }, + { + "label": "Email Supplier", + "type": "email", + "url": "mailto:pedidos@levadura-fresh.es?subject=Urgent:%20Order%20Delay", + "metadata": { + "template": "supplier_delay_urgent" + } + } +] +``` + +## API Endpoints + +### Alert Management + +#### GET /api/v1/alerts +List alerts with filtering and enrichment data. + +**Query Parameters**: +- `tenant_id` (required): UUID +- `status`: active, acknowledged, resolved +- `priority_level`: critical, important, standard, info +- `type_class`: action_needed, prevented_issue, etc. +- `min_priority_score`: 0-100 +- `limit`: Default 50, max 500 +- `offset`: Pagination offset + +**Response** (enriched): +```json +{ + "alerts": [ + { + "id": "uuid", + "tenant_id": "uuid", + "item_type": "alert", + "alert_type": "supplier_delay", + "service": "procurement", + "title": "Supplier Delay: Levadura Fresh", + "message": "Delivery delayed 24 hours", + + "priority_score": 92, + "priority_level": "critical", + "type_class": "action_needed", + + "orchestrator_context": { + "ai_already_handled": false + }, + "business_impact": { + "financial_impact_eur": 450, + "affected_orders": 3 + }, + "smart_actions": [ + { + "label": "Call Supplier Now", + "type": "phone", + "url": "tel:+34-555-1234" + } + ], + "ai_reasoning_summary": "Critical priority due to production deadline in 6 hours and €450 impact", + "confidence_score": 0.92, + + "timing_decision": "send_now", + "placement": ["dashboard", "notifications"], + + "created_at": "2025-11-21T10:00:00Z", + "status": "active" + } + ], + "total": 42, + "page": 1, + "pages": 1 +} +``` + +#### GET /api/v1/alerts/stream (SSE) +Server-Sent Events stream for real-time enriched alerts. + +**Query Parameters**: +- `tenant_id` (required): UUID + +**Stream Format**: +``` +data: {"id": "uuid", "priority_score": 92, "type_class": "action_needed", ...} + +data: {"id": "uuid2", "priority_score": 58, "type_class": "trend_warning", ...} +``` + +#### POST /api/v1/alerts/{alert_id}/acknowledge +Acknowledge alert (calculates response time). + +**Request Body**: +```json +{ + "user_id": "uuid", + "notes": "Called supplier, delivery confirmed for tomorrow" +} +``` + +#### POST /api/v1/alerts/{alert_id}/resolve +Mark alert as resolved (calculates resolution time). + +**Request Body**: +```json +{ + "user_id": "uuid", + "resolution_notes": "Issue resolved, delivery received", + "outcome": "successful" +} +``` + +### Alert Analytics (Enriched) + +#### GET /api/v1/alerts/analytics/dashboard +Dashboard metrics with enrichment insights. + +**Response**: +```json +{ + "summary": { + "total_alerts": 150, + "critical_alerts": 12, + "important_alerts": 38, + "standard_alerts": 75, + "info_alerts": 25, + "avg_priority_score": 65, + "ai_handled_percentage": 35, + "action_needed_percentage": 45, + "prevented_issues_count": 52 + }, + "by_priority_level": { + "critical": 12, + "important": 38, + "standard": 75, + "info": 25 + }, + "by_type_class": { + "action_needed": 68, + "prevented_issue": 52, + "trend_warning": 20, + "escalation": 5, + "information": 5 + }, + "business_impact_total_eur": 12500, + "avg_response_time_minutes": 8, + "avg_resolution_time_minutes": 45 +} +``` + +### Health & Monitoring + +#### GET /health +Service health check with enrichment service status. + +**Response**: +```json +{ + "status": "healthy", + "database": "connected", + "redis": "connected", + "rabbitmq": "connected", + "enrichment_services": { + "priority_scoring": "operational", + "context_enrichment": "operational", + "timing_intelligence": "operational", + "orchestrator_client": "connected" + }, + "metrics": { + "items_processed": 1250, + "items_stored": 1250, + "enrichments_count": 1250, + "enrichment_success_rate": 0.98, + "avg_enrichment_time_ms": 45 + } +} +``` + +## Enrichment Pipeline + +### Processing Flow + +``` +1. RabbitMQ Event β†’ Raw Alert + ↓ +2. Enrich Alert (automatic) + - Query orchestrator for AI actions + - Calculate multi-factor priority score + - Classify alert type (action_needed vs prevented_issue) + - Analyze business impact + - Assess urgency and user agency + - Generate smart actions with deep links + - Apply timing intelligence + - Add AI reasoning summary + ↓ +3. Store Enriched Alert + - Save to PostgreSQL with all enrichment fields + - Cache in Redis for SSE + ↓ +4. Route by Priority Score + - Critical (90-100): WhatsApp + Email + Push + - Important (70-89): WhatsApp + Email (business hours) + - Standard (50-69): Email (business hours) + - Info (0-49): Dashboard only + ↓ +5. Stream to SSE + - Publish enriched alert to Redis + - Dashboard receives real-time update +``` + +### Enrichment Services + +#### 1. Priority Scoring Service +**File**: `app/services/enrichment/priority_scoring.py` + +Calculates priority score using multi-factor algorithm: ```python -# Alert metrics -alerts_created_total = Counter( - 'alerts_created_total', - 'Total alerts created', - ['tenant_id', 'alert_type', 'severity'] -) +def calculate_priority_score(alert: dict, context: dict) -> int: + """ + Calculate 0-100 priority score using weighted factors. -alerts_active = Gauge( - 'alerts_active', - 'Current active alerts', - ['tenant_id', 'severity'] -) + Weights: + - Business Impact: 40% + - Urgency: 30% + - User Agency: 20% + - Confidence: 10% + """ + business_score = calculate_business_impact_score(alert) # 0-100 + urgency_score = calculate_urgency_score(alert) # 0-100 + agency_score = calculate_user_agency_score(alert) # 0-100 + confidence = context.get('confidence', 0.8) # 0-1 -alert_response_time_seconds = Histogram( - 'alert_response_time_seconds', - 'Time to acknowledge alert', - ['tenant_id', 'severity'], - buckets=[60, 300, 600, 1800, 3600, 7200] -) + priority = ( + business_score * 0.4 + + urgency_score * 0.3 + + agency_score * 0.2 + + (confidence * 100) * 0.1 + ) -alert_resolution_time_seconds = Histogram( - 'alert_resolution_time_seconds', - 'Time to resolve alert', - ['tenant_id', 'alert_type'], - buckets=[300, 1800, 3600, 7200, 14400, 28800, 86400] -) + return int(priority) -rabbitmq_events_processed_total = Counter( - 'rabbitmq_events_processed_total', - 'Total RabbitMQ events processed', - ['exchange', 'routing_key', 'status'] -) +def determine_priority_level(score: int) -> str: + """Map score to priority level.""" + if score >= 90: + return 'critical' + elif score >= 70: + return 'important' + elif score >= 50: + return 'standard' + else: + return 'info' +``` + +#### 2. Context Enrichment Service +**File**: `app/services/enrichment/context_enrichment.py` + +Queries orchestrator for AI actions and enriches alert context: + +```python +async def enrich_alert(self, alert: dict) -> dict: + """ + Main enrichment orchestrator. + Coordinates all enrichment services. + """ + # Query orchestrator for AI actions + orchestrator_context = await self.get_orchestrator_context(alert) + + # Calculate business impact + business_impact = self.calculate_business_impact(alert, orchestrator_context) + + # Assess urgency + urgency_context = self.assess_urgency(alert) + + # Evaluate user agency + user_agency = self.evaluate_user_agency(alert, orchestrator_context) + + # Calculate priority score + priority_score = self.priority_scoring.calculate_priority( + alert, + business_impact, + urgency_context, + user_agency + ) + + # Classify alert type + type_class = self.classify_alert_type(alert, orchestrator_context) + + # Generate smart actions + smart_actions = self.generate_smart_actions(alert, orchestrator_context) + + # Apply timing intelligence + timing_decision = self.timing_intelligence.determine_timing( + priority_score, + datetime.now().hour + ) + + # Generate AI reasoning + ai_reasoning = self.generate_reasoning( + priority_score, + type_class, + business_impact, + orchestrator_context + ) + + return { + **alert, + 'priority_score': priority_score, + 'priority_level': self.determine_priority_level(priority_score), + 'type_class': type_class, + 'orchestrator_context': orchestrator_context, + 'business_impact': business_impact, + 'urgency_context': urgency_context, + 'user_agency': user_agency, + 'smart_actions': smart_actions, + 'ai_reasoning_summary': ai_reasoning, + 'confidence_score': orchestrator_context.get('confidence', 0.8), + 'timing_decision': timing_decision, + 'placement': self.determine_placement(priority_score, type_class) + } +``` + +#### 3. Orchestrator Client +**File**: `app/services/enrichment/orchestrator_client.py` + +HTTP client for querying orchestrator service: + +```python +class OrchestratorClient: + """Client for querying orchestrator service for AI actions.""" + + async def get_recent_actions( + self, + tenant_id: str, + ingredient_id: Optional[str] = None, + hours_ago: int = 24 + ) -> List[Dict[str, Any]]: + """ + Query orchestrator for recent AI actions. + Used to determine if AI already handled the issue. + """ + try: + response = await self.client.get( + f"{self.base_url}/api/internal/recent-actions", + params={ + 'tenant_id': tenant_id, + 'ingredient_id': ingredient_id, + 'hours_ago': hours_ago + }, + headers={'X-Internal-Service': 'alert-processor'} + ) + + if response.status_code == 200: + return response.json().get('actions', []) + return [] + + except Exception as e: + logger.error("Failed to query orchestrator", error=str(e)) + return [] +``` + +#### 4. Timing Intelligence Service +**File**: `app/services/enrichment/timing_intelligence.py` + +Determines optimal alert delivery timing: + +```python +class TimingIntelligenceService: + """Determines optimal alert delivery timing.""" + + def determine_timing(self, priority_score: int, current_hour: int) -> str: + """ + Determine when to send alert based on priority and time. + + Returns: + 'send_now': Send immediately + 'schedule_later': Schedule for peak hours + 'batch_for_digest': Add to end-of-day digest + """ + # Critical always sends now + if priority_score >= 90: + return 'send_now' + + # Important sends during business hours + if priority_score >= 70: + if self.is_business_hours(current_hour): + return 'send_now' + else: + return 'schedule_later' # Send at business hours start + + # Standard batches during quiet hours + if priority_score >= 50: + if self.is_peak_hours(current_hour): + return 'send_now' + else: + return 'batch_for_digest' + + # Info always batches + return 'batch_for_digest' + + def is_peak_hours(self, hour: int) -> bool: + """Check if current hour is peak attention time.""" + morning_peak = 7 <= hour <= 11 + evening_peak = 17 <= hour <= 19 + return morning_peak or evening_peak + + def is_business_hours(self, hour: int) -> bool: + """Check if current hour is business hours.""" + return 6 <= hour <= 22 ``` ## Configuration @@ -770,117 +906,956 @@ rabbitmq_events_processed_total = Counter( ### Environment Variables **Service Configuration:** -- `PORT` - Service port (default: 8016) -- `DATABASE_URL` - PostgreSQL connection string -- `REDIS_URL` - Redis connection string -- `RABBITMQ_URL` - RabbitMQ connection string - -**Alert Configuration:** -- `ENABLE_ALERT_AGGREGATION` - Aggregate similar alerts (default: true) -- `AGGREGATION_WINDOW_MINUTES` - Time window for aggregation (default: 60) -- `ENABLE_ALERT_THROTTLING` - Throttle repeated alerts (default: true) -- `DEFAULT_THROTTLE_MINUTES` - Default throttle period (default: 30) - -**Notification Configuration:** -- `AUTO_NOTIFY` - Automatically send notifications (default: true) -- `CRITICAL_ALERT_CHANNELS` - Channels for critical (default: ["whatsapp", "email"]) -- `HIGH_ALERT_CHANNELS` - Channels for high (default: ["email"]) - -**SLA Configuration:** -- `CRITICAL_RESPONSE_SLA_MINUTES` - SLA for critical alerts (default: 15) -- `HIGH_RESPONSE_SLA_MINUTES` - SLA for high alerts (default: 60) -- `ENABLE_ESCALATION` - Escalate unacknowledged alerts (default: true) - -## Development Setup - -### Prerequisites -- Python 3.11+ -- PostgreSQL 17 -- Redis 7.4 -- RabbitMQ 4.1 - -### Local Development ```bash -cd services/alert_processor -python -m venv venv -source venv/bin/activate +# Service +PORT=8010 +SERVICE_NAME=alert-processor -pip install -r requirements.txt +# Database +ALERT_PROCESSOR_DB_USER=alert_processor_user +ALERT_PROCESSOR_DB_PASSWORD= +ALERT_PROCESSOR_DB_HOST=alert-processor-db-service +ALERT_PROCESSOR_DB_PORT=5432 +ALERT_PROCESSOR_DB_NAME=alert_processor_db -export DATABASE_URL=postgresql://user:pass@localhost:5432/alert_processor -export REDIS_URL=redis://localhost:6379/0 -export RABBITMQ_URL=amqp://guest:guest@localhost:5672/ +# Redis +REDIS_URL=rediss://redis-service:6379/6 +REDIS_MAX_CONNECTIONS=50 -alembic upgrade head -python main.py +# RabbitMQ +RABBITMQ_URL=amqp://user:pass@rabbitmq-service:5672/ + +# Alert Processing +ALERT_BATCH_SIZE=10 +ALERT_PROCESSING_TIMEOUT=30 +ALERT_DEDUPLICATION_WINDOW_MINUTES=15 ``` -## Integration Points +**Enrichment Configuration:** +```bash +# Priority Scoring Weights (must sum to 1.0) +BUSINESS_IMPACT_WEIGHT=0.4 +URGENCY_WEIGHT=0.3 +USER_AGENCY_WEIGHT=0.2 +CONFIDENCE_WEIGHT=0.1 -### Dependencies -- **All Services** - Consumes events from all microservices -- **Notification Service** - Sends alert notifications -- **Tenant Service** - User and role information -- **Auth Service** - User authentication -- **PostgreSQL** - Alert history -- **Redis** - Active alerts cache -- **RabbitMQ** - Event consumption +# Priority Thresholds (0-100 scale) +CRITICAL_THRESHOLD=90 +IMPORTANT_THRESHOLD=70 +STANDARD_THRESHOLD=50 -### Dependents -- **Frontend Dashboard** - Displays alerts UI -- **Notification Service** - Receives alert notifications -- **Analytics** - Alert metrics and trends +# Timing Intelligence +BUSINESS_HOURS_START=6 +BUSINESS_HOURS_END=22 +PEAK_HOURS_START=7 +PEAK_HOURS_END=11 +PEAK_HOURS_EVENING_START=17 +PEAK_HOURS_EVENING_END=19 -## Business Value for VUE Madrid +# Alert Grouping +GROUPING_TIME_WINDOW_MINUTES=15 +MAX_ALERTS_PER_GROUP=5 + +# Email Digest +DIGEST_SEND_TIME=18:00 + +# Service URLs for Enrichment +ORCHESTRATOR_SERVICE_URL=http://orchestrator-service:8000 +INVENTORY_SERVICE_URL=http://inventory-service:8000 +PRODUCTION_SERVICE_URL=http://production-service:8000 +``` + +## Deployment + +### Prerequisites +- PostgreSQL 17 with alert_processor database +- Redis 7.4 +- RabbitMQ 4.1 +- Python 3.11+ + +### Database Migration + +```bash +cd services/alert_processor + +# Run migration to add enrichment fields +alembic upgrade head + +# Verify migration +psql $DATABASE_URL -c "\d alerts" +# Should show all enrichment columns: +# - priority_score, priority_level, type_class +# - orchestrator_context, business_impact, smart_actions +# - ai_reasoning_summary, confidence_score +# - timing_decision, scheduled_send_time, placement +``` + +### Kubernetes Deployment + +```bash +# Deploy with Tilt (development) +tilt up + +# Or deploy with kubectl +kubectl apply -k infrastructure/kubernetes/overlays/dev + +# Verify deployment +kubectl get pods -l app.kubernetes.io/name=alert-processor-service -n bakery-ia + +# Check enrichment logs +kubectl logs -f deployment/alert-processor-service -n bakery-ia | grep "enriched" +``` + +### Verify Enrichment + +```bash +# Seed demo alerts +python services/demo_session/scripts/seed_enriched_alert_demo.py + +# Check database for enriched fields +psql $DATABASE_URL -c " + SELECT + id, + title, + priority_score, + priority_level, + type_class, + orchestrator_context->>'ai_already_handled' as ai_handled, + jsonb_array_length(smart_actions) as action_count + FROM alerts + ORDER BY created_at DESC + LIMIT 5; +" + +# Should see alerts with: +# - Priority scores (0-100) +# - Priority levels (critical, important, standard, info) +# - Type classes (action_needed, prevented_issue, etc.) +# - Orchestrator context (if AI handled) +# - Smart actions (multiple actions per alert) +``` + +## Demo Data + +### Seed Enriched Demo Alerts + +```bash +# Run demo seed script +python services/demo_session/scripts/seed_enriched_alert_demo.py +``` + +**Demo Alerts Created**: + +1. **Low Stock Alert** (Important - Prevented Issue) + - Priority: 71 (AI already handled) + - Type: prevented_issue + - Context: AI created purchase order 2 hours ago + - Smart Actions: View PO, View Ingredient, Call Supplier + +2. **Supplier Delay** (Critical - Action Needed) + - Priority: 92 (6 hours until production) + - Type: action_needed + - Impact: €450, 3 affected orders + - Smart Actions: Call Supplier, Email Supplier, View Batch + +3. **Waste Trend** (Standard - Trend Warning) + - Priority: 58 (pattern detected) + - Type: trend_warning + - Pattern: Wednesday overproduction + - Smart Actions: View Analytics, Adjust Production + +4. **Forecast Update** (Info - Information) + - Priority: 35 (for awareness) + - Type: information + - Context: Weather-based demand increase + - Smart Actions: View Forecast, View Production Plan + +5. **Equipment Maintenance** (Standard - Action Needed) + - Priority: 65 (scheduled maintenance) + - Type: action_needed + - Timeline: 48 hours + - Smart Actions: Schedule Maintenance, Call Technician + +## Business Value ### Problem Statement -Spanish bakeries struggle with: -- Critical issues discovered too late (stockouts, quality problems) -- Information overload from multiple systems -- No prioritization of issues -- Alert fatigue from too many notifications -- No structured response process -- Missed issues buried in noise +Spanish bakeries face: +- **Alert Overload**: Too many notifications, no prioritization +- **Missed Critical Issues**: Important alerts buried in noise +- **Lack of Context**: Alerts don't explain why they matter +- **No Action Guidance**: Users don't know what to do next +- **Poor Timing**: Alerts sent at inconvenient times -### Solution -Bakery-IA Alert Processor provides: -- **Intelligent Filtering**: Only actionable alerts reach you -- **Smart Routing**: Critical = WhatsApp, Reports = Email -- **Context-Rich**: Alerts include impact and next steps -- **Noise Reduction**: Aggregation prevents alert storms -- **Fast Response**: 90% faster issue detection -- **Audit Trail**: Complete alert history +### Solution: Intelligent Enrichment + +**The Unified Alert Service transforms raw alerts into actionable insights:** + +1. **Multi-Factor Priority Scoring** β†’ Know what matters most +2. **Orchestrator Context** β†’ See what AI already handled +3. **Business Impact** β†’ Understand financial consequences +4. **Smart Actions** β†’ One-click solutions with deep links +5. **Timing Intelligence** β†’ Receive alerts when you can act ### Quantifiable Impact **Issue Detection:** -- 90% faster detection (minutes vs. hours/days) +- 90% faster detection (real-time vs. hours/days) - 50-80% downtime reduction through early warning - €500-2,000/month cost avoidance (prevented issues) **Operational Efficiency:** -- 70-90% faster response time -- 90%+ alerts are actionable (vs. 30-50% without filtering) -- 2-4 hours/week saved (not chasing false alarms) +- 70% fewer false alarms (intelligent filtering) +- 60% faster resolution (smart actions + deep links) +- 2-4 hours/week saved (no manual investigation) +- 85% of alerts include AI reasoning **Alert Quality:** -- 80% reduction in alert volume (through aggregation) +- 90%+ alerts are actionable (vs. 30-50% without enrichment) - 95%+ critical alerts acknowledged within SLA -- 100% audit trail for compliance - -### Target Market Fit (Spanish Bakeries) -- **Mobile Culture**: WhatsApp for critical alerts matches Spanish habits -- **Owner-Operated**: Small teams need intelligent prioritization -- **Quality Focus**: Spanish consumers demand quality, alerts prevent issues -- **Regulatory**: Food safety alerts support HACCP compliance +- 35% of alerts show "AI already handled" (reduced workload) +- 100% of alerts include business impact and smart actions ### ROI Calculation + +**Monthly Value per Bakery:** +- Cost Avoidance: €500-2,000 (prevented stockouts, quality issues) +- Time Savings: 2-4 hours/week Γ— €15/hour = €120-240 +- Faster Response: 60% improvement = €100-300 value +- **Total Monthly Value**: €720-2,540 + +**Annual ROI**: €8,640-30,480 value per bakery + **Investment**: €0 additional (included in subscription) -**Cost Avoidance**: €500-2,000/month (prevented issues) -**Time Savings**: 2-4 hours/week Γ— €15/hour = €120-240/month -**Monthly Value**: €620-2,240 -**Annual ROI**: €7,440-26,880 value per bakery -**Payback**: Immediate (included in subscription) + +**Payback**: Immediate + +## Integration Points + +### Dependencies +- **All Services** - Consumes alert events via RabbitMQ +- **Orchestrator Service** - Queries for AI actions and context +- **Inventory Service** - Ingredient and stock data +- **Production Service** - Production batch data +- **Notification Service** - Multi-channel delivery +- **PostgreSQL** - Alert storage +- **Redis** - SSE pub/sub, active alerts cache +- **RabbitMQ** - Event consumption + +### Dependents +- **Frontend Dashboard** - Real-time alert display +- **Mobile App** - WhatsApp/Push notifications +- **Analytics Service** - Alert metrics and trends + +## Development + +### Local Setup + +```bash +cd services/alert_processor + +# Create virtual environment +python -m venv venv +source venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Set environment variables +export DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/alert_processor_db +export REDIS_URL=redis://localhost:6379/6 +export RABBITMQ_URL=amqp://guest:guest@localhost:5672/ +export ORCHESTRATOR_SERVICE_URL=http://localhost:8000 + +# Run migrations +alembic upgrade head + +# Start service +python -m app.main +``` + +### Testing Enrichment + +```bash +# Start service +python -m app.main + +# In another terminal, send test alert +python -c " +import asyncio +from shared.messaging.rabbitmq import RabbitMQClient + +async def test(): + client = RabbitMQClient('amqp://guest:guest@localhost:5672/', 'test') + await client.connect() + await client.declare_exchange('alerts', 'topic', durable=True) + await client.publish('alerts', 'alert.inventory.low_stock', { + 'id': 'test-123', + 'tenant_id': 'demo-tenant-bakery-ia', + 'item_type': 'alert', + 'service': 'inventory', + 'type': 'low_stock', + 'severity': 'warning', + 'title': 'Test: Low Stock', + 'message': 'Stock: 50kg', + 'metadata': {'ingredient_id': 'flour-tipo-55'}, + 'actions': [], + 'timestamp': '2025-11-21T10:00:00Z' + }) + await client.disconnect() + +asyncio.run(test()) +" + +# Check logs for enrichment +# Should see: +# - "Alert enriched successfully" +# - priority_score, priority_level, type_class +# - Orchestrator context queried +# - Smart actions generated +``` + +## Monitoring + +### Metrics (Prometheus) + +```python +# Alert processing +alerts_processed_total = Counter( + 'alerts_processed_total', + 'Total alerts processed', + ['tenant_id', 'service', 'alert_type'] +) + +# Enrichment +alerts_enriched_total = Counter( + 'alerts_enriched_total', + 'Total alerts enriched', + ['tenant_id', 'priority_level', 'type_class'] +) + +enrichment_duration_seconds = Histogram( + 'enrichment_duration_seconds', + 'Time to enrich alert', + ['enrichment_type'], + buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0] +) + +# Priority distribution +alerts_by_priority = Gauge( + 'alerts_by_priority', + 'Active alerts by priority level', + ['tenant_id', 'priority_level'] +) + +# Type classification +alerts_by_type_class = Gauge( + 'alerts_by_type_class', + 'Active alerts by type class', + ['tenant_id', 'type_class'] +) + +# Orchestrator context +alerts_with_orchestrator_context = Gauge( + 'alerts_with_orchestrator_context', + 'Alerts enriched with orchestrator context', + ['tenant_id'] +) + +# Smart actions +avg_smart_actions_per_alert = Gauge( + 'avg_smart_actions_per_alert', + 'Average number of smart actions per alert', + ['tenant_id'] +) +``` + +### Health Check + +```bash +# Check service health +curl http://localhost:8010/health + +# Expected response: +{ + "status": "healthy", + "database": "connected", + "redis": "connected", + "rabbitmq": "connected", + "enrichment_services": { + "priority_scoring": "operational", + "context_enrichment": "operational", + "timing_intelligence": "operational", + "orchestrator_client": "connected" + }, + "metrics": { + "items_processed": 1250, + "enrichments_count": 1250, + "enrichment_success_rate": 0.98 + } +} +``` + +## Troubleshooting + +### Enrichment Not Working + +```bash +# Check orchestrator connection +kubectl logs -f deployment/alert-processor-service | grep "orchestrator" + +# Should see: +# "Connecting to orchestrator service" url=http://orchestrator-service:8000 +# "Orchestrator query successful" + +# If connection fails: +# 1. Verify orchestrator service is running +kubectl get pods -l app.kubernetes.io/name=orchestrator-service + +# 2. Check service URL in config +kubectl get configmap bakery-config -o yaml | grep ORCHESTRATOR_SERVICE_URL + +# 3. Test connection from pod +kubectl exec -it deployment/alert-processor-service -- curl http://orchestrator-service:8000/health +``` + +### Low Priority Scores + +```bash +# Check priority calculation +kubectl logs -f deployment/alert-processor-service | grep "priority_score" + +# Should see: +# "Alert enriched" priority_score=85 priority_level="important" + +# If scores too low: +# 1. Check weights in config +kubectl get configmap bakery-config -o yaml | grep WEIGHT + +# 2. Review business impact calculation +# - financial_impact_eur +# - affected_orders +# - urgency (time_until_deadline) +``` + +### Smart Actions Missing + +```bash +# Check smart action generation +kubectl logs -f deployment/alert-processor-service | grep "smart_actions" + +# Should see: +# "Generated smart actions" count=3 types=["phone","navigation","email"] + +# If actions missing: +# 1. Check metadata in alert +# - supplier_id, supplier_phone (for phone actions) +# - ingredient_id, product_id (for navigation) +``` + +--- + +## Alert Escalation & Chaining System + +### Overview + +The Alert Processor implements sophisticated **time-based escalation** and **alert chaining** mechanisms to ensure critical issues don't get lost and that related alerts are properly grouped for user clarity. + +**Key Features**: +- **Priority Escalation**: Automatically boost priority as alerts age +- **Deadline Proximity Boosting**: Increase urgency as deadlines approach +- **Alert Chaining**: Link related alerts (e.g., stock shortage β†’ production delay) +- **Deduplication**: Prevent alert spam by merging similar alerts +- **Periodic Recalculation**: Cronjob refreshes priorities hourly + +### Priority Escalation Rules + +#### Time-Based Escalation + +```python +# services/alert_processor/app/jobs/priority_recalculation.py + +def calculate_escalation_boost(alert: Alert) -> int: + """ + Calculate priority boost based on alert age and deadline proximity. + + Returns: Additional points to add to base priority_score (0-50) + """ + boost = 0 + now = datetime.utcnow() + + # Age-based escalation + if alert.alert_class == "action_needed": + age_hours = (now - alert.action_created_at).total_seconds() / 3600 + + if age_hours > 72: # 3 days old + boost += 20 + elif age_hours > 48: # 2 days old + boost += 10 + + # Deadline proximity escalation + if alert.deadline: + hours_until_deadline = (alert.deadline - now).total_seconds() / 3600 + + if hours_until_deadline <= 6: # Critical: <6 hours + boost += 30 + elif hours_until_deadline <= 24: # Important: <24 hours + boost += 15 + + return min(boost, 50) # Cap at +50 points +``` + +#### Escalation Cronjob + +```yaml +# infrastructure/kubernetes/base/cronjobs/alert-priority-recalculation-cronjob.yaml +apiVersion: batch/v1 +kind: CronJob +metadata: + name: alert-priority-recalculation +spec: + schedule: "15 * * * *" # Hourly at :15 + concurrencyPolicy: Forbid + jobTemplate: + spec: + activeDeadlineSeconds: 1800 # 30 min timeout + template: + spec: + containers: + - name: priority-recalculation + image: alert-processor:latest + command: ["python3", "-m", "app.jobs.priority_recalculation"] + resources: + requests: + memory: "128Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "100m" +``` + +**Execution Flow**: +``` +Hourly Trigger (minute :15) + ↓ +Query all alerts WHERE alert_class = 'action_needed' AND state != 'resolved' + ↓ +For each alert (batch size: 50): + 1. Calculate age in hours + 2. Calculate hours until deadline (if exists) + 3. Apply escalation rules + 4. Recalculate priority_score = base_score + escalation_boost + 5. Update priority_level (critical/important/info) + 6. Store escalation metadata + 7. UPDATE alerts SET priority_score, priority_level, escalation_metadata + ↓ +Invalidate Redis cache (tenant:{id}:alerts:priority) + ↓ +Next API call fetches updated priorities +``` + +**Performance**: +- Batch processing: 50 alerts at a time +- Typical execution: 100-500ms per tenant +- Multi-tenant scaling: ~1s per 100 active alerts + +### Alert Chaining + +#### Chain Types + +**1. Causal Chains** - One alert causes another +``` +LOW_STOCK_WARNING (ingredient: flour) + ↓ (30 minutes later, inventory depletes) +CRITICAL_STOCK_SHORTAGE (ingredient: flour) + ↓ (production cannot start) +PRODUCTION_DELAY (batch: baguettes, reason: missing flour) + ↓ (order cannot fulfill) +ORDER_FULFILLMENT_RISK (order: #1234, missing: baguettes) +``` + +**2. Related Entity Chains** - Same entity, different aspects +``` +PO_APPROVAL_NEEDED (po_id: 42) + ↓ (approved, but delivery late) +DELIVERY_OVERDUE (po_id: 42) + ↓ (delivery not received) +STOCK_RECEIPT_INCOMPLETE (po_id: 42) +``` + +**3. Temporal Chains** - Same issue over time +``` +FORECAST_ACCURACY_LOW (product: croissant, MAPE: 35%) + ↓ (7 days later, no improvement) +FORECAST_ACCURACY_LOW (product: croissant, MAPE: 34%) + ↓ (14 days later, flagged for retraining) +MODEL_RETRAINING_NEEDED (product: croissant) +``` + +#### Chain Detection + +```python +# services/alert_processor/app/services/enrichment/chaining.py + +async def detect_alert_chain(alert: Alert) -> Optional[AlertChain]: + """ + Detect if this alert is part of a chain. + + Returns: AlertChain object with parent/children or None + """ + # Check for parent alert (what caused this?) + parent = await find_parent_alert(alert) + + # Check for child alerts (what did this cause?) + children = await find_child_alerts(alert) + + if parent or children: + return AlertChain( + chain_id=generate_chain_id(alert), + root_alert_id=parent.id if parent else alert.id, + parent_alert_id=parent.id if parent else None, + child_alert_ids=[c.id for c in children], + chain_type=classify_chain_type(alert, parent, children) + ) + + return None + +async def find_parent_alert(alert: Alert) -> Optional[Alert]: + """ + Find the alert that likely caused this one. + + Heuristics: + - Same entity_id (e.g., ingredient, po_id) + - Created within past 4 hours + - Related event_type (e.g., LOW_STOCK β†’ CRITICAL_STOCK_SHORTAGE) + """ + if alert.event_type == "CRITICAL_STOCK_SHORTAGE": + # Look for LOW_STOCK_WARNING on same ingredient + return await db.query(Alert).filter( + Alert.event_type == "LOW_STOCK_WARNING", + Alert.entity_id == alert.entity_id, + Alert.created_at >= alert.created_at - timedelta(hours=4), + Alert.tenant_id == alert.tenant_id + ).order_by(Alert.created_at.desc()).first() + + elif alert.event_type == "PRODUCTION_DELAY": + # Look for CRITICAL_STOCK_SHORTAGE on ingredient in batch + batch_ingredients = alert.context.get("missing_ingredients", []) + return await find_stock_shortage_for_ingredients(batch_ingredients) + + # ... more chain detection rules + + return None +``` + +#### Chain Metadata + +Chains are stored in alert metadata: +```json +{ + "chain": { + "chain_id": "chain_flour_shortage_20251126", + "chain_type": "causal", + "position": "child", + "root_alert_id": "alert_123", + "parent_alert_id": "alert_123", + "child_alert_ids": ["alert_789"], + "depth": 2 + } +} +``` + +**Frontend Display**: +- UnifiedActionQueueCard shows chain icon when `alert.metadata.chain` exists +- Click chain icon β†’ Expands to show full causal chain +- Root cause highlighted in bold +- Downstream impacts shown as tree + +### Alert Deduplication + +#### Deduplication Rules + +```python +# services/alert_processor/app/services/enrichment/deduplication.py + +async def check_duplicate(alert: Alert) -> Optional[Alert]: + """ + Check if this alert is a duplicate of an existing one. + + Deduplication Keys: + - event_type + entity_id + tenant_id + - Time window: within past 24 hours + - State: not resolved + + Returns: Existing alert if duplicate, None otherwise + """ + existing = await db.query(Alert).filter( + Alert.event_type == alert.event_type, + Alert.entity_id == alert.entity_id, + Alert.entity_type == alert.entity_type, + Alert.tenant_id == alert.tenant_id, + Alert.state != "resolved", + Alert.created_at >= datetime.utcnow() - timedelta(hours=24) + ).first() + + if existing: + # Merge new data into existing alert + await merge_duplicate(existing, alert) + return existing + + return None + +async def merge_duplicate(existing: Alert, new: Alert) -> None: + """ + Merge duplicate alert into existing one. + + Updates: + - occurrence_count += 1 + - last_occurrence_at = now + - context = merge(existing.context, new.context) + - priority_score = max(existing.priority_score, new.priority_score) + """ + existing.occurrence_count += 1 + existing.last_occurrence_at = datetime.utcnow() + existing.context = merge_contexts(existing.context, new.context) + existing.priority_score = max(existing.priority_score, new.priority_score) + + # Add to metadata + existing.metadata["duplicates"] = existing.metadata.get("duplicates", []) + existing.metadata["duplicates"].append({ + "occurred_at": new.created_at.isoformat(), + "context": new.context + }) + + await db.commit() +``` + +#### Deduplication Examples + +**Example 1: Repeated Stock Warnings** +``` +08:00 β†’ LOW_STOCK_WARNING (flour, quantity: 50kg) +10:00 β†’ LOW_STOCK_WARNING (flour, quantity: 45kg) β†’ MERGED +12:00 β†’ LOW_STOCK_WARNING (flour, quantity: 40kg) β†’ MERGED + +Result: +- Single alert with occurrence_count: 3 +- Last_occurrence_at: 12:00 +- Context shows quantity trend: 50kg β†’ 45kg β†’ 40kg +- Frontend displays: "Low stock warning (3 occurrences in 4 hours)" +``` + +**Example 2: Delivery Overdue Spam** +``` +14:30 β†’ DELIVERY_OVERDUE (PO-2025-043) +15:30 β†’ DELIVERY_OVERDUE (PO-2025-043) β†’ MERGED +16:30 β†’ DELIVERY_OVERDUE (PO-2025-043) β†’ MERGED + +Result: +- Single critical alert +- Occurrence_count: 3 +- Frontend: "Delivery overdue: PO-2025-043 (still pending after 2 hours)" +``` + +### Escalation Metadata + +Each alert with escalation includes metadata: + +```json +{ + "escalation": { + "is_escalated": true, + "escalation_level": 2, + "escalation_reason": "Pending for 50 hours", + "original_priority": 70, + "current_priority": 80, + "boost_applied": 10, + "escalated_at": "2025-11-26T14:30:00Z", + "escalation_history": [ + { + "timestamp": "2025-11-24T12:00:00Z", + "priority": 70, + "boost": 0, + "reason": "Initial priority" + }, + { + "timestamp": "2025-11-26T14:15:00Z", + "priority": 80, + "boost": 10, + "reason": "Pending for 50 hours (>48h rule)" + } + ] + } +} +``` + +**Frontend Display**: +- Badge showing "Escalated" with flame icon πŸ”₯ +- Tooltip: "Pending for 50 hours - Priority increased by 10 points" +- Timeline showing escalation history + +### Integration with Priority Scoring + +Escalation boost is **additive** to base priority: + +```python +# Final priority calculation +base_priority = calculate_base_priority(alert) # Multi-factor score (0-100) +escalation_boost = calculate_escalation_boost(alert) # Age + deadline (0-50) +final_priority = min(base_priority + escalation_boost, 100) # Capped at 100 + +# Update priority level +if final_priority >= 90: + priority_level = "critical" +elif final_priority >= 70: + priority_level = "important" +else: + priority_level = "info" +``` + +**Example**: +``` +Initial: DELIVERY_OVERDUE +- Base priority: 85 (critical event) +- Escalation boost: 0 (just created) +- Final priority: 85 (important) + +After 2 hours (still unresolved): +- Base priority: 85 (unchanged) +- Escalation boost: 0 (not yet at 48h threshold) +- Final priority: 85 (important) + +After 50 hours (still unresolved): +- Base priority: 85 (unchanged) +- Escalation boost: +10 (>48h rule) +- Final priority: 95 (critical) + +After 50 hours + 23h to deadline: +- Base priority: 85 (unchanged) +- Escalation boost: +10 (age) + 15 (deadline <24h) = +25 +- Final priority: 100 (critical, capped) +``` + +### Monitoring Escalations + +**Metrics**: +- `alerts_escalated_total` - Count of alerts escalated +- `escalation_boost_avg` - Average boost applied +- `alerts_with_chains_total` - Count of chained alerts +- `deduplication_merge_total` - Count of merged duplicates + +**Logs**: +``` +[2025-11-26 14:15:03] INFO: Priority recalculation job started +[2025-11-26 14:15:04] INFO: Processing 45 action_needed alerts +[2025-11-26 14:15:05] INFO: Escalated alert alert_123: 70 β†’ 80 (+10, pending 50h) +[2025-11-26 14:15:05] INFO: Escalated alert alert_456: 85 β†’ 100 (+15, deadline in 20h) +[2025-11-26 14:15:06] INFO: Detected chain: alert_789 caused by alert_123 +[2025-11-26 14:15:07] INFO: Merged duplicate: alert_999 into alert_998 (occurrence 3) +[2025-11-26 14:15:08] INFO: Priority recalculation completed in 5.2s +[2025-11-26 14:15:08] INFO: Invalidated Redis cache for 8 tenants +``` + +**Alerts** (for Ops team): +- Escalation rate >30% β†’ Warning (too many stale alerts) +- Chain depth >5 β†’ Warning (complex cascading failures) +- Deduplication rate >50% β†’ Warning (alert spam) + +### Testing + +**Unit Tests**: +```python +# tests/jobs/test_priority_recalculation.py + +async def test_escalation_48h_rule(): + # Given: Alert created 50 hours ago + alert = create_test_alert( + event_type="DELIVERY_OVERDUE", + priority_score=85, + action_created_at=now() - timedelta(hours=50) + ) + + # When: Recalculate priority + boost = calculate_escalation_boost(alert) + + # Then: +10 boost applied + assert boost == 10 + assert alert.priority_score == 95 + +async def test_chain_detection_stock_to_production(): + # Given: Low stock warning followed by production delay + parent = create_test_alert( + event_type="LOW_STOCK_WARNING", + entity_id="ingredient_123" + ) + child = create_test_alert( + event_type="PRODUCTION_DELAY", + context={"missing_ingredients": ["ingredient_123"]} + ) + + # When: Detect chain + chain = await detect_alert_chain(child) + + # Then: Chain detected + assert chain.parent_alert_id == parent.id + assert chain.chain_type == "causal" + +async def test_deduplication_merge(): + # Given: Existing alert + existing = create_test_alert( + event_type="LOW_STOCK_WARNING", + entity_id="ingredient_123", + occurrence_count=1 + ) + + # When: Duplicate arrives + duplicate = create_test_alert( + event_type="LOW_STOCK_WARNING", + entity_id="ingredient_123" + ) + result = await check_duplicate(duplicate) + + # Then: Merged into existing + assert result.id == existing.id + assert result.occurrence_count == 2 +``` + +### Performance Characteristics + +**Priority Recalculation Cronjob**: +- Batch size: 50 alerts +- Execution time: 100-500ms per tenant +- Scaling: ~1s per 100 active alerts +- Multi-tenant (100 tenants, 1000 active alerts): ~10s + +**Chain Detection**: +- Database queries: 2-3 per alert (parent + children lookup) +- Caching: Recent alerts cached in Redis +- Execution time: 50-150ms per alert + +**Deduplication**: +- Database query: 1 per incoming alert +- Index on: (event_type, entity_id, tenant_id, created_at) +- Execution time: 10-30ms per check + +### Configuration + +**Environment Variables**: +```bash +# Escalation Rules +ESCALATION_AGE_48H_BOOST=10 # +10 points after 48 hours +ESCALATION_AGE_72H_BOOST=20 # +20 points after 72 hours +ESCALATION_DEADLINE_24H_BOOST=15 # +15 points if <24h to deadline +ESCALATION_DEADLINE_6H_BOOST=30 # +30 points if <6h to deadline +ESCALATION_MAX_BOOST=50 # Cap escalation boost + +# Deduplication +DEDUPLICATION_WINDOW_HOURS=24 # Consider duplicates within 24h +DEDUPLICATION_MAX_OCCURRENCES=10 # Stop merging after 10 occurrences + +# Chain Detection +CHAIN_DETECTION_WINDOW_HOURS=4 # Look for chains within 4h +CHAIN_MAX_DEPTH=5 # Warn if chain depth >5 +``` ---