Files
2025-12-05 20:07:01 +01:00

181 lines
4.9 KiB
Python

"""
Pydantic schemas for enriched events.
"""
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional, Literal
from datetime import datetime
from uuid import UUID
class I18nContent(BaseModel):
"""i18n content structure"""
title_key: str
title_params: Dict[str, Any] = {}
message_key: str
message_params: Dict[str, Any] = {}
class SmartAction(BaseModel):
"""Smart action button"""
action_type: str
label_key: str
label_params: Dict[str, Any] = {}
variant: Literal["primary", "secondary", "danger", "ghost"]
disabled: bool = False
disabled_reason_key: Optional[str] = None
consequence_key: Optional[str] = None
url: Optional[str] = None
metadata: Dict[str, Any] = {}
class BusinessImpact(BaseModel):
"""Business impact context"""
financial_impact_eur: float = 0
affected_orders: int = 0
affected_customers: List[str] = []
production_delay_hours: float = 0
estimated_revenue_loss_eur: float = 0
customer_impact: Literal["low", "medium", "high"] = "low"
waste_risk_kg: float = 0
class Urgency(BaseModel):
"""Urgency context"""
hours_until_consequence: float = 24
can_wait_until_tomorrow: bool = True
deadline_utc: Optional[str] = None
peak_hour_relevant: bool = False
hours_pending: float = 0
class UserAgency(BaseModel):
"""User agency context"""
can_user_fix: bool = True
requires_external_party: bool = False
external_party_name: Optional[str] = None
external_party_contact: Optional[str] = None
blockers: List[str] = []
suggested_workaround: Optional[str] = None
class OrchestratorContext(BaseModel):
"""AI orchestrator context"""
already_addressed: bool = False
action_id: Optional[str] = None
action_type: Optional[str] = None
action_summary: Optional[str] = None
reasoning: Optional[str] = None
confidence: float = 0.8
class EnrichedEvent(BaseModel):
"""Complete enriched event with all context"""
# Core fields
id: str
tenant_id: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
# Classification
event_class: Literal["alert", "notification", "recommendation"]
event_domain: str
event_type: str
service: str
# i18n content
i18n: I18nContent
# Priority
priority_score: int = Field(ge=0, le=100)
priority_level: Literal["critical", "important", "standard", "info"]
type_class: str
# Enrichment contexts
orchestrator_context: Optional[OrchestratorContext] = None
business_impact: Optional[BusinessImpact] = None
urgency: Optional[Urgency] = None
user_agency: Optional[UserAgency] = None
trend_context: Optional[Dict[str, Any]] = None
# Smart actions
smart_actions: List[SmartAction] = []
# AI reasoning
ai_reasoning_summary_key: Optional[str] = None
ai_reasoning_summary_params: Optional[Dict[str, Any]] = None
ai_reasoning_details: Optional[Dict[str, Any]] = None
confidence_score: Optional[float] = None
# Entity references
entity_links: Dict[str, str] = {}
# Status
status: Literal["active", "acknowledged", "resolved", "dismissed"] = "active"
resolved_at: Optional[datetime] = None
acknowledged_at: Optional[datetime] = None
# Original metadata
event_metadata: Dict[str, Any] = {}
class Config:
from_attributes = True
class EventResponse(BaseModel):
"""Event response for API"""
id: UUID
tenant_id: UUID
created_at: datetime
event_class: str
event_domain: str
event_type: str
i18n: I18nContent
priority_score: int
priority_level: str
type_class: str
smart_actions: List[SmartAction]
status: str
# Optional enrichment contexts (only if present)
orchestrator_context: Optional[Dict[str, Any]] = None
business_impact: Optional[Dict[str, Any]] = None
urgency: Optional[Dict[str, Any]] = None
user_agency: Optional[Dict[str, Any]] = None
# AI reasoning
ai_reasoning_summary_key: Optional[str] = None
ai_reasoning_summary_params: Optional[Dict[str, Any]] = None
ai_reasoning_details: Optional[Dict[str, Any]] = None
confidence_score: Optional[float] = None
entity_links: Dict[str, str] = {}
event_metadata: Optional[Dict[str, Any]] = None
class Config:
from_attributes = True
class EventSummary(BaseModel):
"""Summary statistics for dashboard"""
total_active: int
total_acknowledged: int
total_resolved: int
by_priority: Dict[str, int]
by_domain: Dict[str, int]
by_type_class: Dict[str, int]
critical_alerts: int
important_alerts: int
class EventFilter(BaseModel):
"""Filter criteria for event queries"""
tenant_id: UUID
event_class: Optional[str] = None
priority_level: Optional[List[str]] = None
status: Optional[List[str]] = None
event_domain: Optional[str] = None
limit: int = Field(default=50, le=100)
offset: int = 0