Fix imports

This commit is contained in:
Urtzi Alfaro
2025-07-18 14:41:39 +02:00
parent a469f0c01d
commit 4073222888
30 changed files with 123 additions and 119 deletions

View File

@@ -1,12 +1,15 @@
# ================================================================
# services/training/app/services/messaging.py
# ================================================================
"""
Training service messaging - Uses shared RabbitMQ client only
Messaging service for training service
"""
import structlog
from shared.messaging.rabbitmq import RabbitMQClient
from app.core.config import settings
import logging
logger = logging.getLogger(__name__)
logger = structlog.get_logger()
# Single global instance
training_publisher = RabbitMQClient(settings.RABBITMQ_URL, "training-service")
@@ -25,14 +28,22 @@ async def cleanup_messaging():
logger.info("Training service messaging cleaned up")
# Convenience functions for training-specific events
async def publish_training_started(data: dict) -> bool:
async def publish_training_started(job_data: dict) -> bool:
"""Publish training started event"""
return await training_publisher.publish_training_event("started", data)
return await training_publisher.publish_training_event("started", job_data)
async def publish_training_completed(data: dict) -> bool:
async def publish_training_completed(job_data: dict) -> bool:
"""Publish training completed event"""
return await training_publisher.publish_training_event("completed", data)
return await training_publisher.publish_training_event("completed", job_data)
async def publish_training_failed(data: dict) -> bool:
async def publish_training_failed(job_data: dict) -> bool:
"""Publish training failed event"""
return await training_publisher.publish_training_event("failed", data)
return await training_publisher.publish_training_event("failed", job_data)
async def publish_model_validated(model_data: dict) -> bool:
"""Publish model validation event"""
return await training_publisher.publish_training_event("model.validated", model_data)
async def publish_model_saved(model_data: dict) -> bool:
"""Publish model saved event"""
return await training_publisher.publish_training_event("model.saved", model_data)

View File

@@ -1,9 +1,8 @@
"""
Training service business logic
"""
import asyncio
import logging
import structlog
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
@@ -16,10 +15,9 @@ from app.core.config import settings
from app.models.training import TrainingJob, TrainedModel, TrainingLog
from app.schemas.training import TrainingRequest, TrainingJobResponse, TrainedModelResponse
from app.ml.trainer import MLTrainer
from app.services.messaging import message_publisher
from shared.messaging.events import TrainingStartedEvent, TrainingCompletedEvent, TrainingFailedEvent
from app.services.messaging import publish_training_started, publish_training_completed, publish_training_failed
logger = logging.getLogger(__name__)
logger = structlog.get_logger()
class TrainingService:
"""Training service business logic"""
@@ -46,8 +44,8 @@ class TrainingService:
progress=0,
current_step="Queued for training",
requested_by=user_data.get("user_id"),
training_data_from= datetime.now(timezone.utc) - timedelta(days=request.training_days),
training_data_to= datetime.now(timezone.utc)
training_data_from=datetime.now(timezone.utc) - timedelta(days=request.training_days),
training_data_to=datetime.now(timezone.utc)
)
db.add(training_job)
@@ -57,24 +55,20 @@ class TrainingService:
# Start training in background
asyncio.create_task(self._execute_training(training_job.id, request, db))
# Publish training started event
await message_publisher.publish_event(
"training_events",
"training.started",
TrainingStartedEvent(
event_id=str(uuid.uuid4()),
service_name="training-service",
timestamp= datetime.now(timezone.utc),
data={
"job_id": str(training_job.id),
"tenant_id": tenant_id,
"requested_by": user_data.get("user_id"),
"training_days": request.training_days
}
).__dict__
)
# Publish training started event - SIMPLIFIED
event_data = {
"job_id": str(training_job.id),
"tenant_id": tenant_id,
"requested_by": user_data.get("user_id"),
"training_days": request.training_days,
"timestamp": datetime.now(timezone.utc).isoformat()
}
logger.info(f"Training job started: {training_job.id} for tenant: {tenant_id}")
success = await publish_training_started(event_data)
if not success:
logger.warning("Failed to publish training started event", job_id=str(training_job.id))
logger.info("Training job started", job_id=str(training_job.id), tenant_id=tenant_id)
return TrainingJobResponse(
id=str(training_job.id),
@@ -206,7 +200,7 @@ class TrainingService:
async def _execute_training(self, job_id: str, request: TrainingRequest, db: AsyncSession):
"""Execute training job"""
start_time = datetime.now(timezone.utc)
start_time = datetime.now(timezone.utc)
try:
# Update job status
@@ -229,47 +223,39 @@ class TrainingService:
await self._save_trained_models(job_id, models_result, validation_result, db)
# Complete job
duration = int(( datetime.now(timezone.utc) - start_time).total_seconds())
duration = int((datetime.now(timezone.utc) - start_time).total_seconds())
await self._complete_job(job_id, models_result, validation_result, duration, db)
# Publish completion event
await message_publisher.publish_event(
"training_events",
"training.completed",
TrainingCompletedEvent(
event_id=str(uuid.uuid4()),
service_name="training-service",
timestamp= datetime.now(timezone.utc),
data={
"job_id": str(job_id),
"models_trained": len(models_result),
"duration_seconds": duration
}
).__dict__
)
# Publish completion event - SIMPLIFIED
event_data = {
"job_id": str(job_id),
"models_trained": len(models_result),
"duration_seconds": duration,
"timestamp": datetime.now(timezone.utc).isoformat()
}
logger.info(f"Training job completed: {job_id}")
success = await publish_training_completed(event_data)
if not success:
logger.warning("Failed to publish training completed event", job_id=str(job_id))
logger.info("Training job completed", job_id=str(job_id))
except Exception as e:
logger.error(f"Training job failed: {job_id} - {e}")
logger.error("Training job failed", job_id=str(job_id), error=str(e))
# Update job as failed
await self._update_job_status(job_id, "failed", 0, f"Training failed: {str(e)}", db)
# Publish failure event
await message_publisher.publish_event(
"training_events",
"training.failed",
TrainingFailedEvent(
event_id=str(uuid.uuid4()),
service_name="training-service",
timestamp= datetime.now(timezone.utc),
data={
"job_id": str(job_id),
"error": str(e)
}
).__dict__
)
# Publish failure event - SIMPLIFIED
event_data = {
"job_id": str(job_id),
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat()
}
success = await publish_training_failed(event_data)
if not success:
logger.warning("Failed to publish training failed event", job_id=str(job_id))
async def _update_job_status(self, job_id: str, status: str, progress: int, current_step: str, db: AsyncSession):
"""Update training job status"""
@@ -281,7 +267,7 @@ class TrainingService:
status=status,
progress=progress,
current_step=current_step,
updated_at= datetime.now(timezone.utc)
updated_at=datetime.now(timezone.utc)
)
)
await db.commit()
@@ -312,7 +298,7 @@ class TrainingService:
raise Exception(f"Failed to get training data: {response.status_code}")
except Exception as e:
logger.error(f"Error getting training data: {e}")
logger.error("Error getting training data", error=str(e))
raise
async def _save_trained_models(self, job_id: str, models_result: Dict[str, Any], validation_result: Dict[str, Any], db: AsyncSession):
@@ -374,7 +360,7 @@ class TrainingService:
status="completed",
progress=100,
current_step="Training completed successfully",
completed_at= datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
duration_seconds=duration,
models_trained=models_result,
metrics=metrics,