Skip to content

[ML] Fix race condition between job open, close and kill #75113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,6 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
}
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
xContentRegistry, autodetectWorkerExecutor);

}

private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
Expand Down Expand Up @@ -735,6 +734,12 @@ private Consumer<String> onProcessCrash(JobTask jobTask) {
private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, String reason) {
String jobId = jobTask.getJobId();
long allocationId = jobTask.getAllocationId();
// We use a lock to prevent simultaneous open and close from conflicting. However, we found
// that we could not use the lock to stop kill from conflicting because that could lead to
// a kill taking an unacceptably long time to have an effect, which largely defeats the point
// of having an option to quickly kill a process. Therefore we have to deal with the effects
// of kill running simultaneously with open and close.
boolean jobKilled = false;
processContext.tryLock();
try {
if (processContext.setDying() == false) {
Expand All @@ -744,29 +749,47 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask,
return;
}

if (reason == null) {
// If the job was killed early on during its open sequence then
// its context will already have been removed from this map
jobKilled = (processByAllocation.containsKey(allocationId) == false);
if (jobKilled) {
logger.debug("[{}] Cleaning up job opened after kill", jobId);
} else if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
}

AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator == null) {
assert jobKilled == false
: "Job " + jobId + " killed before process started yet still had no communicator during cleanup after process started";
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
processByAllocation.remove(allocationId);
} else {
communicator.close();
if (jobKilled) {
communicator.killProcess(true, false, false);
} else {
// communicator.close() may take a long time to run, if the job persists a large model state as a
// result of calling it. We want to leave open the option to kill the job during this time, which
// is why the allocation ID must remain in the map until after the close is complete.
communicator.close();
processByAllocation.remove(allocationId);
}
}

processByAllocation.remove(allocationId);
} catch (Exception e) {
// If the close failed because the process has explicitly been killed by us then just pass on that exception
// If the close failed because the process has explicitly been killed by us then just pass on that exception.
// (Note that jobKilled may be false in this case, if the kill is executed while communicator.close() is running.)
if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) {
throw e;
logger.trace("[{}] Conflict between kill and close during autodetect process cleanup - job {} before cleanup started",
jobId, jobKilled ? "killed" : "not killed");
throw (ElasticsearchStatusException) e;
}
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
String msg = jobKilled ? "Exception cleaning up autodetect process started after kill" : "Exception closing autodetect process";
logger.warn("[" + jobId + "] " + msg, e);
setJobState(jobTask, JobState.FAILED, e.getMessage());
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
throw ExceptionsHelper.serverError(msg, e);
} finally {
// to ensure the contract that multiple simultaneous close calls for the same job wait until
// the job is closed is honoured, hold the lock throughout the close procedure so that another
Expand Down