Files
bakery-ia/services/training/app/services/messaging.py
Urtzi Alfaro 277e8bec73 Add user role
2025-08-02 09:41:50 +02:00

571 lines
20 KiB
Python

# services/training/app/services/messaging.py
"""
Enhanced training service messaging - Complete status publishing implementation
Uses shared RabbitMQ infrastructure with comprehensive progress tracking
"""
import structlog
from typing import Dict, Any, Optional, List
from datetime import datetime
from shared.messaging.rabbitmq import RabbitMQClient
from shared.messaging.events import (
TrainingStartedEvent,
TrainingCompletedEvent,
TrainingFailedEvent
)
from app.core.config import settings
import json
import numpy as np
logger = structlog.get_logger()
# Single global instance
training_publisher = RabbitMQClient(settings.RABBITMQ_URL, "training-service")
async def setup_messaging():
"""Initialize messaging for training service"""
success = await training_publisher.connect()
if success:
logger.info("Training service messaging initialized")
else:
logger.warning("Training service messaging failed to initialize")
async def cleanup_messaging():
"""Cleanup messaging for training service"""
await training_publisher.disconnect()
logger.info("Training service messaging cleaned up")
def serialize_for_json(obj: Any) -> Any:
"""
Convert numpy types and other non-JSON serializable objects to JSON-compatible types
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {key: serialize_for_json(value) for key, value in obj.items()}
elif isinstance(obj, (list, tuple)):
return [serialize_for_json(item) for item in obj]
else:
return obj
def safe_json_serialize(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively clean data dictionary for JSON serialization
"""
return serialize_for_json(data)
async def setup_websocket_message_routing():
"""Set up message routing for WebSocket connections"""
try:
# This will be called from the WebSocket endpoint
# to set up the consumer for a specific job
pass
except Exception as e:
logger.error(f"Failed to set up WebSocket message routing: {e}")
# =========================================
# ENHANCED TRAINING JOB STATUS EVENTS
# =========================================
async def publish_job_started(job_id: str, tenant_id: str, config: Dict[str, Any]) -> bool:
"""Publish training job started event"""
event = TrainingStartedEvent(
service_name="training-service",
data={
"job_id": job_id,
"tenant_id": tenant_id,
"config": config,
"started_at": datetime.now().isoformat(),
"estimated_duration_minutes": config.get("estimated_duration_minutes", 15)
}
)
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.started",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job started event", job_id=job_id, tenant_id=tenant_id)
else:
logger.error(f"Failed to publish job started event", job_id=job_id)
return success
async def publish_job_progress(
job_id: str,
tenant_id: str,
progress: int,
step: str,
current_product: Optional[str] = None,
products_completed: int = 0,
products_total: int = 0,
estimated_time_remaining_minutes: Optional[int] = None,
step_details: Optional[str] = None
) -> bool:
"""Publish detailed training job progress event with safe serialization"""
event_data = {
"service_name": "training-service",
"event_type": "training.progress",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"progress": min(max(int(progress), 0), 100), # Ensure int, not numpy.int64
"current_step": step,
"current_product": current_product,
"products_completed": int(products_completed), # Convert numpy types
"products_total": int(products_total),
"estimated_time_remaining_minutes": int(estimated_time_remaining_minutes) if estimated_time_remaining_minutes else None,
"step_details": step_details
}
}
# Clean the entire event data
clean_event_data = safe_json_serialize(event_data)
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.progress",
event_data=clean_event_data
)
if success:
logger.info(f"Published progress update",
job_id=job_id,
progress=progress,
step=step,
current_product=current_product)
else:
logger.error(f"Failed to publish progress update", job_id=job_id)
return success
async def publish_job_step_completed(
job_id: str,
tenant_id: str,
step_name: str,
step_result: Dict[str, Any],
progress: int
) -> bool:
"""Publish when a major training step is completed"""
event_data = {
"service_name": "training-service",
"event_type": "training.step.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"step_name": step_name,
"step_result": step_result,
"progress": progress,
"completed_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.step.completed",
event_data=event_data
)
async def publish_job_completed(job_id: str, tenant_id: str, results: Dict[str, Any]) -> bool:
"""Publish training job completed event with safe JSON serialization"""
# Clean the results data before creating the event
clean_results = safe_json_serialize(results)
event = TrainingCompletedEvent(
service_name="training-service",
data={
"job_id": job_id,
"tenant_id": tenant_id,
"results": clean_results, # Now safe for JSON
"models_trained": clean_results.get("successful_trainings", 0),
"success_rate": clean_results.get("success_rate", 0),
"total_duration_seconds": clean_results.get("overall_training_time_seconds", 0),
"completed_at": datetime.now().isoformat()
}
)
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.completed",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job completed event",
job_id=job_id,
models_trained=clean_results.get("successful_trainings", 0))
else:
logger.error(f"Failed to publish job completed event", job_id=job_id)
return success
async def publish_job_failed(job_id: str, tenant_id: str, error: str, error_details: Optional[Dict] = None) -> bool:
"""Publish training job failed event"""
event = TrainingFailedEvent(
service_name="training-service",
data={
"job_id": job_id,
"tenant_id": tenant_id,
"error": error,
"error_details": error_details or {},
"failed_at": datetime.now().isoformat()
}
)
success = await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.failed",
event_data=event.to_dict()
)
if success:
logger.info(f"Published job failed event", job_id=job_id, error=error)
else:
logger.error(f"Failed to publish job failed event", job_id=job_id)
return success
async def publish_job_cancelled(job_id: str, tenant_id: str, reason: str = "User requested") -> bool:
"""Publish training job cancelled event"""
event_data = {
"service_name": "training-service",
"event_type": "training.cancelled",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"reason": reason,
"cancelled_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.cancelled",
event_data=event_data
)
# =========================================
# PRODUCT-LEVEL TRAINING EVENTS
# =========================================
async def publish_product_training_started(job_id: str, tenant_id: str, product_name: str) -> bool:
"""Publish single product training started event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.product.started",
event_data={
"service_name": "training-service",
"event_type": "training.product.started",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"started_at": datetime.now().isoformat()
}
}
)
async def publish_product_training_completed(
job_id: str,
tenant_id: str,
product_name: str,
model_id: str,
metrics: Optional[Dict[str, float]] = None
) -> bool:
"""Publish single product training completed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.product.completed",
event_data={
"service_name": "training-service",
"event_type": "training.product.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"model_id": model_id,
"metrics": metrics or {},
"completed_at": datetime.now().isoformat()
}
}
)
async def publish_product_training_failed(
job_id: str,
tenant_id: str,
product_name: str,
error: str
) -> bool:
"""Publish single product training failed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.product.failed",
event_data={
"service_name": "training-service",
"event_type": "training.product.failed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"product_name": product_name,
"error": error,
"failed_at": datetime.now().isoformat()
}
}
)
# =========================================
# MODEL LIFECYCLE EVENTS
# =========================================
async def publish_model_trained(model_id: str, tenant_id: str, product_name: str, metrics: Dict[str, float]) -> bool:
"""Publish model trained event with safe metric serialization"""
# Clean metrics to ensure JSON serialization
clean_metrics = safe_json_serialize(metrics) if metrics else {}
event_data = {
"service_name": "training-service",
"event_type": "training.model.trained",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"training_metrics": clean_metrics, # Now safe for JSON
"trained_at": datetime.now().isoformat()
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.model.trained",
event_data=event_data
)
async def publish_model_validated(model_id: str, tenant_id: str, product_name: str, validation_results: Dict[str, Any]) -> bool:
"""Publish model validation event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.model.validated",
event_data={
"service_name": "training-service",
"event_type": "training.model.validated",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"validation_results": validation_results,
"validated_at": datetime.now().isoformat()
}
}
)
async def publish_model_saved(model_id: str, tenant_id: str, product_name: str, model_path: str) -> bool:
"""Publish model saved event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.model.saved",
event_data={
"service_name": "training-service",
"event_type": "training.model.saved",
"timestamp": datetime.now().isoformat(),
"data": {
"model_id": model_id,
"tenant_id": tenant_id,
"product_name": product_name,
"model_path": model_path,
"saved_at": datetime.now().isoformat()
}
}
)
# =========================================
# DATA PROCESSING EVENTS
# =========================================
async def publish_data_validation_started(job_id: str, tenant_id: str, products: List[str]) -> bool:
"""Publish data validation started event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.data.validation.started",
event_data={
"service_name": "training-service",
"event_type": "training.data.validation.started",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"products": products,
"started_at": datetime.now().isoformat()
}
}
)
async def publish_data_validation_completed(
job_id: str,
tenant_id: str,
validation_results: Dict[str, Any]
) -> bool:
"""Publish data validation completed event"""
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.data.validation.completed",
event_data={
"service_name": "training-service",
"event_type": "training.data.validation.completed",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"validation_results": validation_results,
"completed_at": datetime.now().isoformat()
}
}
)
async def publish_models_deleted_event(tenant_id: str, deletion_stats: Dict[str, Any]):
"""Publish models deletion event to message queue"""
try:
await training_publisher.publish_event(
exchange="training_events",
routing_key="training.tenant.models.deleted",
message={
"event_type": "tenant_models_deleted",
"tenant_id": tenant_id,
"timestamp": datetime.utcnow().isoformat(),
"deletion_stats": deletion_stats
}
)
except Exception as e:
logger.error("Failed to publish models deletion event", error=str(e))
# =========================================
# UTILITY FUNCTIONS FOR BATCH PUBLISHING
# =========================================
async def publish_batch_status_update(
job_id: str,
tenant_id: str,
updates: List[Dict[str, Any]]
) -> bool:
"""Publish multiple status updates as a batch"""
batch_event = {
"service_name": "training-service",
"event_type": "training.batch.update",
"timestamp": datetime.now().isoformat(),
"data": {
"job_id": job_id,
"tenant_id": tenant_id,
"updates": updates,
"batch_size": len(updates)
}
}
return await training_publisher.publish_event(
exchange_name="training.events",
routing_key="training.batch.update",
event_data=batch_event
)
# =========================================
# HELPER FUNCTIONS FOR TRAINING INTEGRATION
# =========================================
class TrainingStatusPublisher:
"""Helper class to manage training status publishing throughout the training process"""
def __init__(self, job_id: str, tenant_id: str):
self.job_id = job_id
self.tenant_id = tenant_id
self.start_time = datetime.now()
self.products_total = 0
self.products_completed = 0
async def job_started(self, config: Dict[str, Any], products_total: int = 0):
"""Publish job started with initial configuration"""
self.products_total = products_total
# Clean config data
clean_config = safe_json_serialize(config)
await publish_job_started(self.job_id, self.tenant_id, clean_config)
async def progress_update(
self,
progress: int,
step: str,
current_product: Optional[str] = None,
step_details: Optional[str] = None
):
"""Publish progress update with calculated time estimates"""
elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60
if progress > 0:
estimated_total_minutes = (elapsed_minutes / progress) * 100
estimated_remaining = max(0, estimated_total_minutes - elapsed_minutes)
else:
estimated_remaining = None
await publish_job_progress(
job_id=self.job_id,
tenant_id=self.tenant_id,
progress=int(progress), # Ensure int
step=step,
current_product=current_product,
products_completed=int(self.products_completed), # Ensure int
products_total=int(self.products_total), # Ensure int
estimated_time_remaining_minutes=int(estimated_remaining) if estimated_remaining else None,
step_details=step_details
)
async def product_completed(self, product_name: str, model_id: str, metrics: Optional[Dict] = None):
"""Mark a product as completed and update progress"""
self.products_completed += 1
# Clean metrics before publishing
clean_metrics = safe_json_serialize(metrics) if metrics else None
await publish_product_training_completed(
self.job_id, self.tenant_id, product_name, model_id, clean_metrics
)
# Update overall progress
if self.products_total > 0:
progress = int((self.products_completed / self.products_total) * 90) # Save 10% for final steps
await self.progress_update(
progress=progress,
step=f"Completed training for {product_name}",
current_product=None
)
async def job_completed(self, results: Dict[str, Any]):
"""Publish job completion with clean data"""
clean_results = safe_json_serialize(results)
await publish_job_completed(self.job_id, self.tenant_id, clean_results)
async def job_failed(self, error: str, error_details: Optional[Dict] = None):
"""Publish job failure with clean error details"""
clean_error_details = safe_json_serialize(error_details) if error_details else None
await publish_job_failed(self.job_id, self.tenant_id, error, clean_error_details)