Improve the event messaging for training service

This commit is contained in:
Urtzi Alfaro
2025-07-30 21:21:02 +02:00
parent 5e3fbc5493
commit 923b2d48d2
3 changed files with 428 additions and 79 deletions

View File

@@ -1,11 +1,12 @@
# services/training/app/services/messaging.py
"""
Training service messaging - Clean interface for training-specific events
Uses shared RabbitMQ infrastructure
Enhanced training service messaging - Complete status publishing implementation
Uses shared RabbitMQ infrastructure with comprehensive progress tracking
"""
import structlog
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from datetime import datetime
from shared.messaging.rabbitmq import RabbitMQClient
from shared.messaging.events import (
TrainingStartedEvent,
@@ -14,6 +15,9 @@ from shared.messaging.events import (
)
from app.core.config import settings
import json
import numpy as np
logger = structlog.get_logger()
# Single global instance
@@ -31,8 +35,38 @@ async def cleanup_messaging():
"""Cleanup messaging for training service"""
await training_publisher.disconnect()
logger.info("Training service messaging cleaned up")
def serialize_for_json(obj: Any) -> Any:
"""
Convert numpy types and other non-JSON serializable objects to JSON-compatible types
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {key: serialize_for_json(value) for key, value in obj.items()}
elif isinstance(obj, (list, tuple)):
return [serialize_for_json(item) for item in obj]
else:
return obj
def safe_json_serialize(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively clean data dictionary for JSON serialization
"""
return serialize_for_json(data)
# =========================================
# ENHANCED TRAINING JOB STATUS EVENTS
# =========================================
# Training Job Events
async def publish_job_started(job_id: str, tenant_id: str, config: Dict[str, Any]) -> bool:
"""Publish training job started event"""
event = TrainingStartedEvent(
@@ -40,82 +74,185 @@ async def publish_job_started(job_id: str, tenant_id: str, config: Dict[str, Any
data={
"job_id": job_id,
"tenant_id": tenant_id,
"config": config
"config": config,
"started_at": datetime.now().isoformat(),
"estimated_duration_minutes": config.get("estimated_duration_minutes", 15)
}
)
return await training_publisher.publish_event(
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.started",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job started event", job_id=job_id, tenant_id=tenant_id)
else:
logger.error(f"Failed to publish job started event", job_id=job_id)
return success
async def publish_job_progress(job_id: str, tenant_id: str, progress: int, step: str) -> bool:
"""Publish training job progress event"""
return await training_publisher.publish_event(
async def publish_job_progress(
job_id: str,
tenant_id: str,
progress: int,
step: str,
current_product: Optional[str] = None,
products_completed: int = 0,
products_total: int = 0,
estimated_time_remaining_minutes: Optional[int] = None,
step_details: Optional[str] = None
) -> bool:
"""Publish detailed training job progress event with safe serialization"""
event_data = {
"service_name": "training-service",
"event_type": "training.progress",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"progress": min(max(int(progress), 0), 100), # Ensure int, not numpy.int64
"current_step": step,
"current_product": current_product,
"products_completed": int(products_completed), # Convert numpy types
"products_total": int(products_total),
"estimated_time_remaining_minutes": int(estimated_time_remaining_minutes) if estimated_time_remaining_minutes else None,
"step_details": step_details
}
}
# Clean the entire event data
clean_event_data = safe_json_serialize(event_data)
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.progress",
event_data={
"service_name": "training-service",
"event_type": "training.progress",
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"progress": progress,
"current_step": step
}
event_data=clean_event_data
)
if success:
logger.info(f"Published progress update",
job_id=job_id,
progress=progress,
step=step,
current_product=current_product)
else:
logger.error(f"Failed to publish progress update", job_id=job_id)
return success
async def publish_job_step_completed(
job_id: str,
tenant_id: str,
step_name: str,
step_result: Dict[str, Any],
progress: int
) -> bool:
"""Publish when a major training step is completed"""
event_data = {
"service_name": "training-service",
"event_type": "training.step.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"step_name": step_name,
"step_result": step_result,
"progress": progress,
"completed_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.step.completed",
event_data=event_data
)
async def publish_job_completed(job_id: str, tenant_id: str, results: Dict[str, Any]) -> bool:
"""Publish training job completed event"""
"""Publish training job completed event with safe JSON serialization"""
# Clean the results data before creating the event
clean_results = safe_json_serialize(results)
event = TrainingCompletedEvent(
service_name="training-service",
data={
"job_id": job_id,
"tenant_id": tenant_id,
"results": results,
"models_trained": results.get("products_trained", 0),
"success_rate": results.get("summary", {}).get("success_rate", 0)
"results": clean_results, # Now safe for JSON
"models_trained": clean_results.get("successful_trainings", 0),
"success_rate": clean_results.get("success_rate", 0),
"total_duration_seconds": clean_results.get("overall_training_time_seconds", 0),
"completed_at": datetime.now().isoformat()
}
)
return await training_publisher.publish_event(
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.completed",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job completed event",
job_id=job_id,
models_trained=clean_results.get("successful_trainings", 0))
else:
logger.error(f"Failed to publish job completed event", job_id=job_id)
return success
async def publish_job_failed(job_id: str, tenant_id: str, error: str) -> bool:
async def publish_job_failed(job_id: str, tenant_id: str, error: str, error_details: Optional[Dict] = None) -> bool:
"""Publish training job failed event"""
event = TrainingFailedEvent(
service_name="training-service",
data={
"job_id": job_id,
"tenant_id": tenant_id,
"error": error
"error": error,
"error_details": error_details or {},
"failed_at": datetime.now().isoformat()
}
)
return await training_publisher.publish_event(
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.failed",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job failed event", job_id=job_id, error=error)
else:
logger.error(f"Failed to publish job failed event", job_id=job_id)
return success
async def publish_job_cancelled(job_id: str, tenant_id: str) -> bool:
async def publish_job_cancelled(job_id: str, tenant_id: str, reason: str = "User requested") -> bool:
"""Publish training job cancelled event"""
event_data = {
"service_name": "training-service",
"event_type": "training.cancelled",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"reason": reason,
"cancelled_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.cancelled",
event_data={
"service_name": "training-service",
"event_type": "training.cancelled",
"data": {
"job_id": job_id,
"tenant_id": tenant_id
}
}
event_data=event_data
)
# Product Training Events
# =========================================
# PRODUCT-LEVEL TRAINING EVENTS
# =========================================
async def publish_product_training_started(job_id: str, tenant_id: str, product_name: str) -> bool:
"""Publish single product training started event"""
return await training_publisher.publish_event(
@@ -124,15 +261,23 @@ async def publish_product_training_started(job_id: str, tenant_id: str, product_
event_data={
"service_name": "training-service",
"event_type": "training.product.started",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name
"product_name": product_name,
"started_at": datetime.now().isoformat()
}
}
)
async def publish_product_training_completed(job_id: str, tenant_id: str, product_name: str, model_id: str) -> bool:
async def publish_product_training_completed(
job_id: str,
tenant_id: str,
product_name: str,
model_id: str,
metrics: Optional[Dict[str, float]] = None
) -> bool:
"""Publish single product training completed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
@@ -140,49 +285,71 @@ async def publish_product_training_completed(job_id: str, tenant_id: str, produc
event_data={
"service_name": "training-service",
"event_type": "training.product.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"model_id": model_id
"model_id": model_id,
"metrics": metrics or {},
"completed_at": datetime.now().isoformat()
}
}
)
# Model Events
async def publish_product_training_failed(
job_id: str,
tenant_id: str,
product_name: str,
error: str
) -> bool:
"""Publish single product training failed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.product.failed",
event_data={
"service_name": "training-service",
"event_type": "training.product.failed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"error": error,
"failed_at": datetime.now().isoformat()
}
}
)
# =========================================
# MODEL LIFECYCLE EVENTS
# =========================================
async def publish_model_trained(model_id: str, tenant_id: str, product_name: str, metrics: Dict[str, float]) -> bool:
"""Publish model trained event"""
"""Publish model trained event with safe metric serialization"""
# Clean metrics to ensure JSON serialization
clean_metrics = safe_json_serialize(metrics) if metrics else {}
event_data = {
"service_name": "training-service",
"event_type": "training.model.trained",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"training_metrics": clean_metrics, # Now safe for JSON
"trained_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.model.trained",
event_data={
"service_name": "training-service",
"event_type": "training.model.trained",
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"training_metrics": metrics
}
}
event_data=event_data
)
async def publish_model_updated(model_id: str, tenant_id: str, product_name: str, version: int) -> bool:
"""Publish model updated event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.model.updated",
event_data={
"service_name": "training-service",
"event_type": "training.model.updated",
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"version": version
}
}
)
async def publish_model_validated(model_id: str, tenant_id: str, product_name: str, validation_results: Dict[str, Any]) -> bool:
"""Publish model validation event"""
@@ -192,11 +359,13 @@ async def publish_model_validated(model_id: str, tenant_id: str, product_name: s
event_data={
"service_name": "training-service",
"event_type": "training.model.validated",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"validation_results": validation_results
"validation_results": validation_results,
"validated_at": datetime.now().isoformat()
}
}
)
@@ -209,11 +378,166 @@ async def publish_model_saved(model_id: str, tenant_id: str, product_name: str,
event_data={
"service_name": "training-service",
"event_type": "training.model.saved",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"model_path": model_path
"model_path": model_path,
"saved_at": datetime.now().isoformat()
}
}
)
)
# =========================================
# DATA PROCESSING EVENTS
# =========================================
async def publish_data_validation_started(job_id: str, tenant_id: str, products: List[str]) -> bool:
"""Publish data validation started event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.data.validation.started",
event_data={
"service_name": "training-service",
"event_type": "training.data.validation.started",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"products": products,
"started_at": datetime.now().isoformat()
}
}
)
async def publish_data_validation_completed(
job_id: str,
tenant_id: str,
validation_results: Dict[str, Any]
) -> bool:
"""Publish data validation completed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.data.validation.completed",
event_data={
"service_name": "training-service",
"event_type": "training.data.validation.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"validation_results": validation_results,
"completed_at": datetime.now().isoformat()
}
}
)
# =========================================
# UTILITY FUNCTIONS FOR BATCH PUBLISHING
# =========================================
async def publish_batch_status_update(
job_id: str,
tenant_id: str,
updates: List[Dict[str, Any]]
) -> bool:
"""Publish multiple status updates as a batch"""
batch_event = {
"service_name": "training-service",
"event_type": "training.batch.update",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"updates": updates,
"batch_size": len(updates)
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.batch.update",
event_data=batch_event
)
# =========================================
# HELPER FUNCTIONS FOR TRAINING INTEGRATION
# =========================================
class TrainingStatusPublisher:
"""Helper class to manage training status publishing throughout the training process"""
def __init__(self, job_id: str, tenant_id: str):
self.job_id = job_id
self.tenant_id = tenant_id
self.start_time = datetime.now()
self.products_total = 0
self.products_completed = 0
async def job_started(self, config: Dict[str, Any], products_total: int = 0):
"""Publish job started with initial configuration"""
self.products_total = products_total
# Clean config data
clean_config = safe_json_serialize(config)
await publish_job_started(self.job_id, self.tenant_id, clean_config)
async def progress_update(
self,
progress: int,
step: str,
current_product: Optional[str] = None,
step_details: Optional[str] = None
):
"""Publish progress update with calculated time estimates"""
elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60
if progress > 0:
estimated_total_minutes = (elapsed_minutes / progress) * 100
estimated_remaining = max(0, estimated_total_minutes - elapsed_minutes)
else:
estimated_remaining = None
await publish_job_progress(
job_id=self.job_id,
tenant_id=self.tenant_id,
progress=int(progress), # Ensure int
step=step,
current_product=current_product,
products_completed=int(self.products_completed), # Ensure int
products_total=int(self.products_total), # Ensure int
estimated_time_remaining_minutes=int(estimated_remaining) if estimated_remaining else None,
step_details=step_details
)
async def product_completed(self, product_name: str, model_id: str, metrics: Optional[Dict] = None):
"""Mark a product as completed and update progress"""
self.products_completed += 1
# Clean metrics before publishing
clean_metrics = safe_json_serialize(metrics) if metrics else None
await publish_product_training_completed(
self.job_id, self.tenant_id, product_name, model_id, clean_metrics
)
# Update overall progress
if self.products_total > 0:
progress = int((self.products_completed / self.products_total) * 90) # Save 10% for final steps
await self.progress_update(
progress=progress,
step=f"Completed training for {product_name}",
current_product=None
)
async def job_completed(self, results: Dict[str, Any]):
"""Publish job completion with clean data"""
clean_results = safe_json_serialize(results)
await publish_job_completed(self.job_id, self.tenant_id, clean_results)
async def job_failed(self, error: str, error_details: Optional[Dict] = None):
"""Publish job failure with clean error details"""
clean_error_details = safe_json_serialize(error_details) if error_details else None
await publish_job_failed(self.job_id, self.tenant_id, error, clean_error_details)

View File

@@ -16,7 +16,13 @@ import pandas as pd
from app.services.data_client import DataClient
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType, AlignedDateRange
from app.services.messaging import publish_job_progress, publish_job_failed
from app.services.messaging import (
publish_job_progress,
publish_data_validation_started,
publish_data_validation_completed,
publish_job_step_completed,
publish_job_failed
)
logger = logging.getLogger(__name__)
@@ -69,16 +75,19 @@ class TrainingDataOrchestrator:
try:
#publish_job_progress(job_id, tenant_id, 5, "Extraer datos de venta")
await publish_job_progress(job_id, tenant_id, 5, "Extrayendo datos de ventas",
step_details="Conectando con servicio de datos")
sales_data = await self.data_client.fetch_sales_data(tenant_id)
# Step 1: Extract and validate sales data date range
#publish_job_progress(job_id, tenant_id, 10, "Extraer y validar las fechas de de los datos de venta")
await publish_job_progress(job_id, tenant_id, 10, "Validando fechas de datos de venta",
step_details="Aplicando restricciones de fuentes de datos")
sales_date_range = self._extract_sales_date_range(sales_data)
logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}")
# Step 2: Apply date alignment across all data sources
#publish_job_progress(job_id, tenant_id, 15, "Aplicar la alineación de fechas en todas las fuentes de datos")
await publish_job_progress(job_id, tenant_id, 15, "Alinear el rango de fechas",
step_details="Aplicar la alineación de fechas en todas las fuentes de datos")
aligned_range = self.date_alignment_service.validate_and_align_dates(
user_sales_range=sales_date_range,
requested_start=requested_start,
@@ -90,18 +99,21 @@ class TrainingDataOrchestrator:
logger.info(f"Applied constraints: {aligned_range.constraints}")
# Step 3: Filter sales data to aligned date range
#publish_job_progress(job_id, tenant_id, 20, "Aplicar la alineación de fechas en todas las fuentes de datos")
await publish_job_progress(job_id, tenant_id, 20, "Alinear el rango de las ventas",
step_details="Aplicar la alineación de fechas de las ventas")
filtered_sales = self._filter_sales_data(sales_data, aligned_range)
# Step 4: Collect external data sources concurrently
logger.info("Collecting external data sources...")
#publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas")
await publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas",
step_details="Recopilación de fuentes de datos externas")
weather_data, traffic_data = await self._collect_external_data(
aligned_range, bakery_location, tenant_id
)
# Step 5: Validate data quality
#publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos")
await publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos",
step_details="Validando la calidad de los datos")
data_quality_results = self._validate_data_sources(
filtered_sales, weather_data, traffic_data, aligned_range
)
@@ -128,7 +140,8 @@ class TrainingDataOrchestrator:
)
# Step 7: Final validation
#publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos")
await publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos",
step_details="Validancion final de los datos")
final_validation = self.validate_training_data_quality(training_dataset)
training_dataset.metadata["final_validation"] = final_validation
@@ -141,7 +154,7 @@ class TrainingDataOrchestrator:
return training_dataset
except Exception as e:
#publish_job_failed(job_id, tenant_id, str(e))
publish_job_failed(job_id, tenant_id, str(e))
logger.error(f"Training data preparation failed: {str(e)}")
raise ValueError(f"Failed to prepare training data: {str(e)}")
@@ -546,6 +559,7 @@ class TrainingDataOrchestrator:
return synthetic_data
def validate_training_data_quality(self, dataset: TrainingDataSet) -> Dict[str, Any]:
"""Enhanced validation of training data quality"""
validation_results = {
"is_valid": True,

View File

@@ -61,7 +61,16 @@ class TrainingService:
logger.info(f"Starting training job {job_id} for tenant {tenant_id}")
from app.services.messaging import TrainingStatusPublisher
status_publisher = TrainingStatusPublisher(job_id, tenant_id)
try:
await status_publisher.job_started({
"bakery_location": bakery_location,
"has_custom_dates": bool(requested_start or requested_end)
}, 0) # Will be updated when we know product count
# Step 1: Prepare training dataset with date alignment and orchestration
logger.info("Step 1: Preparing and aligning training data")
training_dataset = await self.orchestrator.prepare_training_data(
@@ -101,10 +110,12 @@ class TrainingService:
}
logger.info(f"Training job {job_id} completed successfully")
await status_publisher.job_completed(final_result)
return TrainingService.create_detailed_training_response(final_result)
except Exception as e:
logger.error(f"Training job {job_id} failed: {str(e)}")
await status_publisher.job_failed(str(e))
# Return error response in same detailed format
final_result = {
"job_id": job_id,