Skip to content

Commit 78295c0

Browse files
authored
[ML] Fix race condition between job opening and feature reset (#75024)
There was a point during the job opening sequence where performing a feature reset could hang. This happened when the kill request issued by feature reset was executed after the job's persistent task was assigned but before the job's native process was started. The persistent task was incorrectly left running in this situation, yet the job opening sequence was aborted which meant the subsequent close request issued by feature reset would wait for a very long time for the persistent task to disappear. The fix is to make the kill process request cancel the persistent task consistently based on its request parameters and not on the current state of the task. Backport of #74976
1 parent 1fb73ef commit 78295c0

File tree

3 files changed

+15
-27
lines changed

3 files changed

+15
-27
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2626
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2727
import org.elasticsearch.xpack.ml.MachineLearning;
28-
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
2928
import org.elasticsearch.xpack.ml.job.task.JobTask;
3029
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
3130

@@ -41,18 +40,15 @@ public class TransportKillProcessAction extends TransportTasksAction<JobTask,
4140
private static final Logger logger = LogManager.getLogger(TransportKillProcessAction.class);
4241

4342
private final AnomalyDetectionAuditor auditor;
44-
private final AutodetectProcessManager processManager;
4543

4644
@Inject
4745
public TransportKillProcessAction(TransportService transportService,
4846
ClusterService clusterService,
4947
ActionFilters actionFilters,
50-
AutodetectProcessManager processManager,
5148
AnomalyDetectionAuditor auditor) {
5249
super(KillProcessAction.NAME, clusterService, transportService, actionFilters, KillProcessAction.Request::new,
5350
KillProcessAction.Response::new, KillProcessAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
5451
this.auditor = auditor;
55-
this.processManager = processManager;
5652
}
5753

5854
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -529,30 +529,29 @@ public void onFailure(Exception e) {
529529
protected void doRun() {
530530
ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
531531
if (processContext == null) {
532-
logger.debug("Aborted opening job [{}] as it has been closed", job.getId());
532+
logger.debug("Aborted opening job [{}] as it has been closed or killed", job.getId());
533533
return;
534534
}
535535
// We check again after the process state is locked to ensure no race conditions are hit.
536536
if (processContext.getJobTask().isClosing()) {
537-
logger.debug("Aborted opening job [{}] as it is being closed", job.getId());
537+
logger.debug("Aborted opening job [{}] as it is being closed (before starting process)", job.getId());
538538
jobTask.markAsCompleted();
539539
return;
540540
}
541541

542542
try {
543543
if (createProcessAndSetRunning(processContext, job, params, closeHandler)) {
544+
// This next check also covers the case of a process being killed while it was being started.
545+
// It relies on callers setting the closing flag on the job task before calling this method.
546+
// It also relies on the fact that at this stage of the process lifecycle kill and close are
547+
// basically identical, i.e. the process has done so little work that making it exit by closing
548+
// its input stream will not result in side effects.
544549
if (processContext.getJobTask().isClosing()) {
545-
logger.debug("Aborted opening job [{}] as it is being closed", job.getId());
550+
logger.debug("Aborted opening job [{}] as it is being closed or killed (after starting process)",
551+
job.getId());
546552
closeProcessAndTask(processContext, jobTask, "job is already closing");
547553
return;
548554
}
549-
// It is possible that a `kill` request came in before the communicator was set
550-
// This means that the kill was not handled appropriately and we continued down this execution path
551-
if (processContext.shouldBeKilled()) {
552-
logger.debug("Aborted opening job [{}] as it is being killed", job.getId());
553-
processContext.killIt();
554-
return;
555-
}
556555
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
557556
setJobState(jobTask, JobState.OPENED);
558557
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ final class ProcessContext {
2828
private final JobTask jobTask;
2929
private volatile AutodetectCommunicator autodetectCommunicator;
3030
private volatile ProcessState state;
31-
private volatile KillBuilder latestKillRequest = null;
3231

3332
ProcessContext(JobTask jobTask) {
3433
this.jobTask = jobTask;
@@ -47,17 +46,6 @@ private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunic
4746
this.autodetectCommunicator = autodetectCommunicator;
4847
}
4948

50-
boolean shouldBeKilled() {
51-
return latestKillRequest != null;
52-
}
53-
54-
void killIt() {
55-
if (latestKillRequest == null) {
56-
throw new IllegalArgumentException("Unable to kill job as previous request is not completed");
57-
}
58-
latestKillRequest.kill();
59-
}
60-
6149
ProcessStateName getState() {
6250
return state.getName();
6351
}
@@ -129,7 +117,12 @@ KillBuilder setShouldFinalizeJob(boolean shouldFinalizeJob) {
129117

130118
void kill() {
131119
if (autodetectCommunicator == null) {
132-
latestKillRequest = this;
120+
// Killing a connected process would also complete the persistent task if `finish` was true,
121+
// so we should do the same here even though the process wasn't yet connected at the time of
122+
// the kill
123+
if (finish) {
124+
jobTask.markAsCompleted();
125+
}
133126
return;
134127
}
135128
String jobId = jobTask.getJobId();

0 commit comments

Comments
 (0)