Merge pull request #3 from ualsweb/claude/fix-training-logs-011CUpoupuk9cR6iBusjfozQ
Fix training log race conditions and audit event error
This commit is contained in:
@@ -236,8 +236,8 @@ async def start_training_job(
|
||||
|
||||
# Log audit event for training job creation
|
||||
try:
|
||||
from app.core.database import get_db
|
||||
db = next(get_db())
|
||||
from app.core.database import database_manager
|
||||
async with database_manager.get_session() as db:
|
||||
await audit_logger.log_event(
|
||||
db_session=db,
|
||||
tenant_id=tenant_id,
|
||||
|
||||
@@ -174,7 +174,7 @@ class EnhancedTrainingService:
|
||||
await self._init_repositories(session)
|
||||
|
||||
try:
|
||||
# Check if training log already exists, create if not
|
||||
# Check if training log already exists, update if found, create if not
|
||||
existing_log = await self.training_log_repo.get_log_by_job_id(job_id)
|
||||
|
||||
if existing_log:
|
||||
@@ -182,6 +182,7 @@ class EnhancedTrainingService:
|
||||
training_log = await self.training_log_repo.update_log_progress(
|
||||
job_id, 0, "initializing", "running"
|
||||
)
|
||||
await session.commit()
|
||||
else:
|
||||
# Create new training log entry
|
||||
log_data = {
|
||||
@@ -191,7 +192,20 @@ class EnhancedTrainingService:
|
||||
"progress": 0,
|
||||
"current_step": "initializing"
|
||||
}
|
||||
try:
|
||||
training_log = await self.training_log_repo.create_training_log(log_data)
|
||||
await session.commit() # Explicit commit so other sessions can see it
|
||||
except Exception as create_error:
|
||||
# Handle race condition: log may have been created by another session
|
||||
if "unique constraint" in str(create_error).lower() or "duplicate" in str(create_error).lower():
|
||||
logger.debug("Training log already exists (race condition), updating instead", job_id=job_id)
|
||||
await session.rollback()
|
||||
training_log = await self.training_log_repo.update_log_progress(
|
||||
job_id, 0, "initializing", "running"
|
||||
)
|
||||
await session.commit()
|
||||
else:
|
||||
raise
|
||||
|
||||
# Step 1: Prepare training dataset (includes sales data validation)
|
||||
logger.info("Step 1: Preparing and aligning training data (with validation)")
|
||||
@@ -749,8 +763,28 @@ class EnhancedTrainingService:
|
||||
# Ensure results are JSON-serializable before storing
|
||||
log_data["results"] = make_json_serializable(results)
|
||||
|
||||
try:
|
||||
await self.training_log_repo.create_training_log(log_data)
|
||||
await session.commit() # Explicit commit so other sessions can see it
|
||||
logger.info("Created initial training log", job_id=job_id, tenant_id=tenant_id)
|
||||
except Exception as create_error:
|
||||
# Handle race condition: another session may have created the log
|
||||
if "unique constraint" in str(create_error).lower() or "duplicate" in str(create_error).lower():
|
||||
logger.debug("Training log already exists (race condition), querying again", job_id=job_id)
|
||||
await session.rollback()
|
||||
# Query again to get the existing log
|
||||
existing_log = await self.training_log_repo.get_log_by_job_id(job_id)
|
||||
if existing_log:
|
||||
# Update the existing log instead
|
||||
await self.training_log_repo.update_log_progress(
|
||||
job_id=job_id,
|
||||
progress=progress,
|
||||
current_step=current_step,
|
||||
status=status
|
||||
)
|
||||
await session.commit()
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
logger.error("Cannot create training log without tenant_id", job_id=job_id)
|
||||
return
|
||||
@@ -777,6 +811,8 @@ class EnhancedTrainingService:
|
||||
if update_data:
|
||||
await self.training_log_repo.update(existing_log.id, update_data)
|
||||
|
||||
await session.commit() # Explicit commit after updates
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to update job status using repository",
|
||||
job_id=job_id,
|
||||
|
||||
Reference in New Issue
Block a user