Files
bakery-ia/shared/messaging/events.py
Urtzi Alfaro 599e335fbb Fix events
2025-07-17 19:03:11 +02:00

122 lines
4.2 KiB
Python

"""
Event definitions for microservices communication
- Simple class-based approach to avoid dataclass issues
"""
from datetime import datetime
from typing import Dict, Any, Optional
import uuid
class BaseEvent:
"""Base event class"""
def __init__(self, service_name: str, data: Dict[str, Any], event_type: str = "", correlation_id: Optional[str] = None):
self.service_name = service_name
self.data = data
self.event_type = event_type
self.event_id = str(uuid.uuid4())
self.timestamp = datetime.utcnow()
self.correlation_id = correlation_id
# Training Events
class TrainingStartedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="training.started",
correlation_id=correlation_id
)
class TrainingCompletedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="training.completed",
correlation_id=correlation_id
)
class TrainingFailedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="training.failed",
correlation_id=correlation_id
)
# Forecasting Events
class ForecastGeneratedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="forecast.generated",
correlation_id=correlation_id
)
class ForecastRequestedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="forecast.requested",
correlation_id=correlation_id
)
# User Events
class UserRegisteredEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="user.registered",
correlation_id=correlation_id
)
class UserLoginEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="user.login",
correlation_id=correlation_id
)
# Tenant Events
class TenantCreatedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="tenant.created",
correlation_id=correlation_id
)
class TenantUpdatedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="tenant.updated",
correlation_id=correlation_id
)
# Notification Events
class NotificationSentEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="notification.sent",
correlation_id=correlation_id
)
class NotificationFailedEvent(BaseEvent):
def __init__(self, service_name: str, data: Dict[str, Any], correlation_id: Optional[str] = None):
super().__init__(
service_name=service_name,
data=data,
event_type="notification.failed",
correlation_id=correlation_id
)