Skip to content

Commit 0f493c3

Browse files
authored
[ML] Fix race condition between job open, close and kill (#75113)
This is a followup to #74976. The changes of #74976 reverted many of the changes of #71656 because #74415 made them redundant. #74415 did this by making killed jobs as closing so that the standard "job closed immediately after open" functionality was used instead of reissuing the kill immediately after opening. However, it turns out that this "job closed immediately after open" functionality is not perfect for the case of a job that is killed while it is opening. It causes AutodetectCommunicator.close() to be called instead of AutodetectCommunicator.killProcess(). Both do a lot of the same things, but AutodetectCommunicator.close() finalizes the job, and this can cause problems if the job is being killed as part of a feature reset. This change reinstates some of the functionality of #71656 but in a different place that hopefully won't reintroduce the problems that led to #74415. We can detect that a kill has happened early on during an open or close operation by checking if the task's allocation ID has been removed from the map after ProcessContext.setDying() returns true. If ProcessContext.setDying() returns true this means the job has not been previously closed, so it must have been killed. Then we can call AutodetectCommunicator.killProcess() instead of AutodetectCommunicator.close() during the cleanup that happens when we detect that a recently started process is no longer wanted. Relates #75069
1 parent 62a6db9 commit 0f493c3

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,6 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
688688
}
689689
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
690690
xContentRegistry, autodetectWorkerExecutor);
691-
692691
}
693692

694693
private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
@@ -735,6 +734,12 @@ private Consumer<String> onProcessCrash(JobTask jobTask) {
735734
private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, String reason) {
736735
String jobId = jobTask.getJobId();
737736
long allocationId = jobTask.getAllocationId();
737+
// We use a lock to prevent simultaneous open and close from conflicting. However, we found
738+
// that we could not use the lock to stop kill from conflicting because that could lead to
739+
// a kill taking an unacceptably long time to have an effect, which largely defeats the point
740+
// of having an option to quickly kill a process. Therefore we have to deal with the effects
741+
// of kill running simultaneously with open and close.
742+
boolean jobKilled = false;
738743
processContext.tryLock();
739744
try {
740745
if (processContext.setDying() == false) {
@@ -744,29 +749,47 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask,
744749
return;
745750
}
746751

747-
if (reason == null) {
752+
// If the job was killed early on during its open sequence then
753+
// its context will already have been removed from this map
754+
jobKilled = (processByAllocation.containsKey(allocationId) == false);
755+
if (jobKilled) {
756+
logger.debug("[{}] Cleaning up job opened after kill", jobId);
757+
} else if (reason == null) {
748758
logger.info("Closing job [{}]", jobId);
749759
} else {
750760
logger.info("Closing job [{}], because [{}]", jobId, reason);
751761
}
752762

753763
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
754764
if (communicator == null) {
765+
assert jobKilled == false
766+
: "Job " + jobId + " killed before process started yet still had no communicator during cleanup after process started";
755767
logger.debug("Job [{}] is being closed before its process is started", jobId);
756768
jobTask.markAsCompleted();
769+
processByAllocation.remove(allocationId);
757770
} else {
758-
communicator.close();
771+
if (jobKilled) {
772+
communicator.killProcess(true, false, false);
773+
} else {
774+
// communicator.close() may take a long time to run, if the job persists a large model state as a
775+
// result of calling it. We want to leave open the option to kill the job during this time, which
776+
// is why the allocation ID must remain in the map until after the close is complete.
777+
communicator.close();
778+
processByAllocation.remove(allocationId);
779+
}
759780
}
760-
761-
processByAllocation.remove(allocationId);
762781
} catch (Exception e) {
763-
// If the close failed because the process has explicitly been killed by us then just pass on that exception
782+
// If the close failed because the process has explicitly been killed by us then just pass on that exception.
783+
// (Note that jobKilled may be false in this case, if the kill is executed while communicator.close() is running.)
764784
if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) {
765-
throw e;
785+
logger.trace("[{}] Conflict between kill and close during autodetect process cleanup - job {} before cleanup started",
786+
jobId, jobKilled ? "killed" : "not killed");
787+
throw (ElasticsearchStatusException) e;
766788
}
767-
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
789+
String msg = jobKilled ? "Exception cleaning up autodetect process started after kill" : "Exception closing autodetect process";
790+
logger.warn("[" + jobId + "] " + msg, e);
768791
setJobState(jobTask, JobState.FAILED, e.getMessage());
769-
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
792+
throw ExceptionsHelper.serverError(msg, e);
770793
} finally {
771794
// to ensure the contract that multiple simultaneous close calls for the same job wait until
772795
// the job is closed is honoured, hold the lock throughout the close procedure so that another

0 commit comments

Comments
 (0)