From 923b2d48d2760198d7703c5b4951a59820d64893 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Wed, 30 Jul 2025 21:21:02 +0200 Subject: [PATCH] Improve the event messaging for training service --- services/training/app/services/messaging.py | 464 +++++++++++++++--- .../app/services/training_orchestrator.py | 32 +- .../training/app/services/training_service.py | 11 + 3 files changed, 428 insertions(+), 79 deletions(-) diff --git a/services/training/app/services/messaging.py b/services/training/app/services/messaging.py index 075efa78..f6cebd34 100644 --- a/services/training/app/services/messaging.py +++ b/services/training/app/services/messaging.py @@ -1,11 +1,12 @@ # services/training/app/services/messaging.py """ -Training service messaging - Clean interface for training-specific events -Uses shared RabbitMQ infrastructure +Enhanced training service messaging - Complete status publishing implementation +Uses shared RabbitMQ infrastructure with comprehensive progress tracking """ import structlog -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List +from datetime import datetime from shared.messaging.rabbitmq import RabbitMQClient from shared.messaging.events import ( TrainingStartedEvent, @@ -14,6 +15,9 @@ from shared.messaging.events import ( ) from app.core.config import settings +import json +import numpy as np + logger = structlog.get_logger() # Single global instance @@ -31,8 +35,38 @@ async def cleanup_messaging(): """Cleanup messaging for training service""" await training_publisher.disconnect() logger.info("Training service messaging cleaned up") + +def serialize_for_json(obj: Any) -> Any: + """ + Convert numpy types and other non-JSON serializable objects to JSON-compatible types + """ + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.bool_): + return bool(obj) + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, dict): + return {key: serialize_for_json(value) for key, value in obj.items()} + elif isinstance(obj, (list, tuple)): + return [serialize_for_json(item) for item in obj] + else: + return obj + +def safe_json_serialize(data: Dict[str, Any]) -> Dict[str, Any]: + """ + Recursively clean data dictionary for JSON serialization + """ + return serialize_for_json(data) + +# ========================================= +# ENHANCED TRAINING JOB STATUS EVENTS +# ========================================= -# Training Job Events async def publish_job_started(job_id: str, tenant_id: str, config: Dict[str, Any]) -> bool: """Publish training job started event""" event = TrainingStartedEvent( @@ -40,82 +74,185 @@ async def publish_job_started(job_id: str, tenant_id: str, config: Dict[str, Any data={ "job_id": job_id, "tenant_id": tenant_id, - "config": config + "config": config, + "started_at": datetime.now().isoformat(), + "estimated_duration_minutes": config.get("estimated_duration_minutes", 15) } ) - return await training_publisher.publish_event( + success = await training_publisher.publish_event( exchange_name="training.events", routing_key="training.started", event_data=event.to_dict() ) + + if success: + logger.info(f"Published job started event", job_id=job_id, tenant_id=tenant_id) + else: + logger.error(f"Failed to publish job started event", job_id=job_id) + + return success -async def publish_job_progress(job_id: str, tenant_id: str, progress: int, step: str) -> bool: - """Publish training job progress event""" - return await training_publisher.publish_event( +async def publish_job_progress( + job_id: str, + tenant_id: str, + progress: int, + step: str, + current_product: Optional[str] = None, + products_completed: int = 0, + products_total: int = 0, + estimated_time_remaining_minutes: Optional[int] = None, + step_details: Optional[str] = None +) -> bool: + """Publish detailed training job progress event with safe serialization""" + event_data = { + "service_name": "training-service", + "event_type": "training.progress", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "progress": min(max(int(progress), 0), 100), # Ensure int, not numpy.int64 + "current_step": step, + "current_product": current_product, + "products_completed": int(products_completed), # Convert numpy types + "products_total": int(products_total), + "estimated_time_remaining_minutes": int(estimated_time_remaining_minutes) if estimated_time_remaining_minutes else None, + "step_details": step_details + } + } + + # Clean the entire event data + clean_event_data = safe_json_serialize(event_data) + + success = await training_publisher.publish_event( exchange_name="training.events", routing_key="training.progress", - event_data={ - "service_name": "training-service", - "event_type": "training.progress", - "data": { - "job_id": job_id, - "tenant_id": tenant_id, - "progress": progress, - "current_step": step - } + event_data=clean_event_data + ) + + if success: + logger.info(f"Published progress update", + job_id=job_id, + progress=progress, + step=step, + current_product=current_product) + else: + logger.error(f"Failed to publish progress update", job_id=job_id) + + return success + +async def publish_job_step_completed( + job_id: str, + tenant_id: str, + step_name: str, + step_result: Dict[str, Any], + progress: int +) -> bool: + """Publish when a major training step is completed""" + event_data = { + "service_name": "training-service", + "event_type": "training.step.completed", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "step_name": step_name, + "step_result": step_result, + "progress": progress, + "completed_at": datetime.now().isoformat() } + } + + return await training_publisher.publish_event( + exchange_name="training.events", + routing_key="training.step.completed", + event_data=event_data ) async def publish_job_completed(job_id: str, tenant_id: str, results: Dict[str, Any]) -> bool: - """Publish training job completed event""" + """Publish training job completed event with safe JSON serialization""" + + # Clean the results data before creating the event + clean_results = safe_json_serialize(results) + event = TrainingCompletedEvent( service_name="training-service", data={ "job_id": job_id, "tenant_id": tenant_id, - "results": results, - "models_trained": results.get("products_trained", 0), - "success_rate": results.get("summary", {}).get("success_rate", 0) + "results": clean_results, # Now safe for JSON + "models_trained": clean_results.get("successful_trainings", 0), + "success_rate": clean_results.get("success_rate", 0), + "total_duration_seconds": clean_results.get("overall_training_time_seconds", 0), + "completed_at": datetime.now().isoformat() } ) - return await training_publisher.publish_event( + + success = await training_publisher.publish_event( exchange_name="training.events", routing_key="training.completed", event_data=event.to_dict() ) + + if success: + logger.info(f"Published job completed event", + job_id=job_id, + models_trained=clean_results.get("successful_trainings", 0)) + else: + logger.error(f"Failed to publish job completed event", job_id=job_id) + + return success -async def publish_job_failed(job_id: str, tenant_id: str, error: str) -> bool: +async def publish_job_failed(job_id: str, tenant_id: str, error: str, error_details: Optional[Dict] = None) -> bool: """Publish training job failed event""" event = TrainingFailedEvent( service_name="training-service", data={ "job_id": job_id, "tenant_id": tenant_id, - "error": error + "error": error, + "error_details": error_details or {}, + "failed_at": datetime.now().isoformat() } ) - return await training_publisher.publish_event( + + success = await training_publisher.publish_event( exchange_name="training.events", routing_key="training.failed", event_data=event.to_dict() ) + + if success: + logger.info(f"Published job failed event", job_id=job_id, error=error) + else: + logger.error(f"Failed to publish job failed event", job_id=job_id) + + return success -async def publish_job_cancelled(job_id: str, tenant_id: str) -> bool: +async def publish_job_cancelled(job_id: str, tenant_id: str, reason: str = "User requested") -> bool: """Publish training job cancelled event""" + event_data = { + "service_name": "training-service", + "event_type": "training.cancelled", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "reason": reason, + "cancelled_at": datetime.now().isoformat() + } + } + return await training_publisher.publish_event( exchange_name="training.events", routing_key="training.cancelled", - event_data={ - "service_name": "training-service", - "event_type": "training.cancelled", - "data": { - "job_id": job_id, - "tenant_id": tenant_id - } - } + event_data=event_data ) -# Product Training Events +# ========================================= +# PRODUCT-LEVEL TRAINING EVENTS +# ========================================= + async def publish_product_training_started(job_id: str, tenant_id: str, product_name: str) -> bool: """Publish single product training started event""" return await training_publisher.publish_event( @@ -124,15 +261,23 @@ async def publish_product_training_started(job_id: str, tenant_id: str, product_ event_data={ "service_name": "training-service", "event_type": "training.product.started", + "timestamp": datetime.now().isoformat(), "data": { "job_id": job_id, "tenant_id": tenant_id, - "product_name": product_name + "product_name": product_name, + "started_at": datetime.now().isoformat() } } ) -async def publish_product_training_completed(job_id: str, tenant_id: str, product_name: str, model_id: str) -> bool: +async def publish_product_training_completed( + job_id: str, + tenant_id: str, + product_name: str, + model_id: str, + metrics: Optional[Dict[str, float]] = None +) -> bool: """Publish single product training completed event""" return await training_publisher.publish_event( exchange_name="training.events", @@ -140,49 +285,71 @@ async def publish_product_training_completed(job_id: str, tenant_id: str, produc event_data={ "service_name": "training-service", "event_type": "training.product.completed", + "timestamp": datetime.now().isoformat(), "data": { "job_id": job_id, "tenant_id": tenant_id, "product_name": product_name, - "model_id": model_id + "model_id": model_id, + "metrics": metrics or {}, + "completed_at": datetime.now().isoformat() } } ) -# Model Events +async def publish_product_training_failed( + job_id: str, + tenant_id: str, + product_name: str, + error: str +) -> bool: + """Publish single product training failed event""" + return await training_publisher.publish_event( + exchange_name="training.events", + routing_key="training.product.failed", + event_data={ + "service_name": "training-service", + "event_type": "training.product.failed", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "product_name": product_name, + "error": error, + "failed_at": datetime.now().isoformat() + } + } + ) + +# ========================================= +# MODEL LIFECYCLE EVENTS +# ========================================= + async def publish_model_trained(model_id: str, tenant_id: str, product_name: str, metrics: Dict[str, float]) -> bool: - """Publish model trained event""" + """Publish model trained event with safe metric serialization""" + + # Clean metrics to ensure JSON serialization + clean_metrics = safe_json_serialize(metrics) if metrics else {} + + event_data = { + "service_name": "training-service", + "event_type": "training.model.trained", + "timestamp": datetime.now().isoformat(), + "data": { + "model_id": model_id, + "tenant_id": tenant_id, + "product_name": product_name, + "training_metrics": clean_metrics, # Now safe for JSON + "trained_at": datetime.now().isoformat() + } + } + return await training_publisher.publish_event( exchange_name="training.events", routing_key="training.model.trained", - event_data={ - "service_name": "training-service", - "event_type": "training.model.trained", - "data": { - "model_id": model_id, - "tenant_id": tenant_id, - "product_name": product_name, - "training_metrics": metrics - } - } + event_data=event_data ) -async def publish_model_updated(model_id: str, tenant_id: str, product_name: str, version: int) -> bool: - """Publish model updated event""" - return await training_publisher.publish_event( - exchange_name="training.events", - routing_key="training.model.updated", - event_data={ - "service_name": "training-service", - "event_type": "training.model.updated", - "data": { - "model_id": model_id, - "tenant_id": tenant_id, - "product_name": product_name, - "version": version - } - } - ) async def publish_model_validated(model_id: str, tenant_id: str, product_name: str, validation_results: Dict[str, Any]) -> bool: """Publish model validation event""" @@ -192,11 +359,13 @@ async def publish_model_validated(model_id: str, tenant_id: str, product_name: s event_data={ "service_name": "training-service", "event_type": "training.model.validated", + "timestamp": datetime.now().isoformat(), "data": { "model_id": model_id, "tenant_id": tenant_id, "product_name": product_name, - "validation_results": validation_results + "validation_results": validation_results, + "validated_at": datetime.now().isoformat() } } ) @@ -209,11 +378,166 @@ async def publish_model_saved(model_id: str, tenant_id: str, product_name: str, event_data={ "service_name": "training-service", "event_type": "training.model.saved", + "timestamp": datetime.now().isoformat(), "data": { "model_id": model_id, "tenant_id": tenant_id, "product_name": product_name, - "model_path": model_path + "model_path": model_path, + "saved_at": datetime.now().isoformat() } } - ) \ No newline at end of file + ) + +# ========================================= +# DATA PROCESSING EVENTS +# ========================================= + +async def publish_data_validation_started(job_id: str, tenant_id: str, products: List[str]) -> bool: + """Publish data validation started event""" + return await training_publisher.publish_event( + exchange_name="training.events", + routing_key="training.data.validation.started", + event_data={ + "service_name": "training-service", + "event_type": "training.data.validation.started", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "products": products, + "started_at": datetime.now().isoformat() + } + } + ) + +async def publish_data_validation_completed( + job_id: str, + tenant_id: str, + validation_results: Dict[str, Any] +) -> bool: + """Publish data validation completed event""" + return await training_publisher.publish_event( + exchange_name="training.events", + routing_key="training.data.validation.completed", + event_data={ + "service_name": "training-service", + "event_type": "training.data.validation.completed", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "validation_results": validation_results, + "completed_at": datetime.now().isoformat() + } + } + ) + +# ========================================= +# UTILITY FUNCTIONS FOR BATCH PUBLISHING +# ========================================= + +async def publish_batch_status_update( + job_id: str, + tenant_id: str, + updates: List[Dict[str, Any]] +) -> bool: + """Publish multiple status updates as a batch""" + batch_event = { + "service_name": "training-service", + "event_type": "training.batch.update", + "timestamp": datetime.now().isoformat(), + "data": { + "job_id": job_id, + "tenant_id": tenant_id, + "updates": updates, + "batch_size": len(updates) + } + } + + return await training_publisher.publish_event( + exchange_name="training.events", + routing_key="training.batch.update", + event_data=batch_event + ) + +# ========================================= +# HELPER FUNCTIONS FOR TRAINING INTEGRATION +# ========================================= + +class TrainingStatusPublisher: + """Helper class to manage training status publishing throughout the training process""" + + def __init__(self, job_id: str, tenant_id: str): + self.job_id = job_id + self.tenant_id = tenant_id + self.start_time = datetime.now() + self.products_total = 0 + self.products_completed = 0 + + async def job_started(self, config: Dict[str, Any], products_total: int = 0): + """Publish job started with initial configuration""" + self.products_total = products_total + + # Clean config data + clean_config = safe_json_serialize(config) + + await publish_job_started(self.job_id, self.tenant_id, clean_config) + + async def progress_update( + self, + progress: int, + step: str, + current_product: Optional[str] = None, + step_details: Optional[str] = None + ): + """Publish progress update with calculated time estimates""" + elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60 + + if progress > 0: + estimated_total_minutes = (elapsed_minutes / progress) * 100 + estimated_remaining = max(0, estimated_total_minutes - elapsed_minutes) + else: + estimated_remaining = None + + await publish_job_progress( + job_id=self.job_id, + tenant_id=self.tenant_id, + progress=int(progress), # Ensure int + step=step, + current_product=current_product, + products_completed=int(self.products_completed), # Ensure int + products_total=int(self.products_total), # Ensure int + estimated_time_remaining_minutes=int(estimated_remaining) if estimated_remaining else None, + step_details=step_details + ) + + async def product_completed(self, product_name: str, model_id: str, metrics: Optional[Dict] = None): + """Mark a product as completed and update progress""" + self.products_completed += 1 + + # Clean metrics before publishing + clean_metrics = safe_json_serialize(metrics) if metrics else None + + await publish_product_training_completed( + self.job_id, self.tenant_id, product_name, model_id, clean_metrics + ) + + # Update overall progress + if self.products_total > 0: + progress = int((self.products_completed / self.products_total) * 90) # Save 10% for final steps + await self.progress_update( + progress=progress, + step=f"Completed training for {product_name}", + current_product=None + ) + + async def job_completed(self, results: Dict[str, Any]): + """Publish job completion with clean data""" + clean_results = safe_json_serialize(results) + await publish_job_completed(self.job_id, self.tenant_id, clean_results) + + async def job_failed(self, error: str, error_details: Optional[Dict] = None): + """Publish job failure with clean error details""" + clean_error_details = safe_json_serialize(error_details) if error_details else None + await publish_job_failed(self.job_id, self.tenant_id, error, clean_error_details) \ No newline at end of file diff --git a/services/training/app/services/training_orchestrator.py b/services/training/app/services/training_orchestrator.py index 4aa69558..c2ea27a7 100644 --- a/services/training/app/services/training_orchestrator.py +++ b/services/training/app/services/training_orchestrator.py @@ -16,7 +16,13 @@ import pandas as pd from app.services.data_client import DataClient from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType, AlignedDateRange -from app.services.messaging import publish_job_progress, publish_job_failed +from app.services.messaging import ( + publish_job_progress, + publish_data_validation_started, + publish_data_validation_completed, + publish_job_step_completed, + publish_job_failed +) logger = logging.getLogger(__name__) @@ -69,16 +75,19 @@ class TrainingDataOrchestrator: try: - #publish_job_progress(job_id, tenant_id, 5, "Extraer datos de venta") + await publish_job_progress(job_id, tenant_id, 5, "Extrayendo datos de ventas", + step_details="Conectando con servicio de datos") sales_data = await self.data_client.fetch_sales_data(tenant_id) # Step 1: Extract and validate sales data date range - #publish_job_progress(job_id, tenant_id, 10, "Extraer y validar las fechas de de los datos de venta") + await publish_job_progress(job_id, tenant_id, 10, "Validando fechas de datos de venta", + step_details="Aplicando restricciones de fuentes de datos") sales_date_range = self._extract_sales_date_range(sales_data) logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}") # Step 2: Apply date alignment across all data sources - #publish_job_progress(job_id, tenant_id, 15, "Aplicar la alineación de fechas en todas las fuentes de datos") + await publish_job_progress(job_id, tenant_id, 15, "Alinear el rango de fechas", + step_details="Aplicar la alineación de fechas en todas las fuentes de datos") aligned_range = self.date_alignment_service.validate_and_align_dates( user_sales_range=sales_date_range, requested_start=requested_start, @@ -90,18 +99,21 @@ class TrainingDataOrchestrator: logger.info(f"Applied constraints: {aligned_range.constraints}") # Step 3: Filter sales data to aligned date range - #publish_job_progress(job_id, tenant_id, 20, "Aplicar la alineación de fechas en todas las fuentes de datos") + await publish_job_progress(job_id, tenant_id, 20, "Alinear el rango de las ventas", + step_details="Aplicar la alineación de fechas de las ventas") filtered_sales = self._filter_sales_data(sales_data, aligned_range) # Step 4: Collect external data sources concurrently logger.info("Collecting external data sources...") - #publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas") + await publish_job_progress(job_id, tenant_id, 25, "Recopilación de fuentes de datos externas", + step_details="Recopilación de fuentes de datos externas") weather_data, traffic_data = await self._collect_external_data( aligned_range, bakery_location, tenant_id ) # Step 5: Validate data quality - #publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos") + await publish_job_progress(job_id, tenant_id, 30, "Validando la calidad de los datos", + step_details="Validando la calidad de los datos") data_quality_results = self._validate_data_sources( filtered_sales, weather_data, traffic_data, aligned_range ) @@ -128,7 +140,8 @@ class TrainingDataOrchestrator: ) # Step 7: Final validation - #publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos") + await publish_job_progress(job_id, tenant_id, 35, "Validancion final de los datos", + step_details="Validancion final de los datos") final_validation = self.validate_training_data_quality(training_dataset) training_dataset.metadata["final_validation"] = final_validation @@ -141,7 +154,7 @@ class TrainingDataOrchestrator: return training_dataset except Exception as e: - #publish_job_failed(job_id, tenant_id, str(e)) + publish_job_failed(job_id, tenant_id, str(e)) logger.error(f"Training data preparation failed: {str(e)}") raise ValueError(f"Failed to prepare training data: {str(e)}") @@ -546,6 +559,7 @@ class TrainingDataOrchestrator: return synthetic_data def validate_training_data_quality(self, dataset: TrainingDataSet) -> Dict[str, Any]: + """Enhanced validation of training data quality""" validation_results = { "is_valid": True, diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index 58d1c586..03f329e1 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -61,7 +61,16 @@ class TrainingService: logger.info(f"Starting training job {job_id} for tenant {tenant_id}") + from app.services.messaging import TrainingStatusPublisher + status_publisher = TrainingStatusPublisher(job_id, tenant_id) + try: + + await status_publisher.job_started({ + "bakery_location": bakery_location, + "has_custom_dates": bool(requested_start or requested_end) + }, 0) # Will be updated when we know product count + # Step 1: Prepare training dataset with date alignment and orchestration logger.info("Step 1: Preparing and aligning training data") training_dataset = await self.orchestrator.prepare_training_data( @@ -101,10 +110,12 @@ class TrainingService: } logger.info(f"Training job {job_id} completed successfully") + await status_publisher.job_completed(final_result) return TrainingService.create_detailed_training_response(final_result) except Exception as e: logger.error(f"Training job {job_id} failed: {str(e)}") + await status_publisher.job_failed(str(e)) # Return error response in same detailed format final_result = { "job_id": job_id,