Skip to content

Commit 9326f7b

Browse files
[ML] Abort starting process if kill request is received (#74415)
While the job is opening it is possible that the kill process action is called. If the kill process action is received before the job process has started, we currently start the process anyway. The process will eventually timeout to connect to anything and will exit. However, it may cause an unexpected failure if the job is opened again as it won't be able to launch a process as one would already exist. This commit ensures the JobTask.isClosing() reports true when the kill process action has been called in order to abort opening the process. Closes #74141
1 parent 68c33dc commit 9326f7b

File tree

5 files changed

+23
-11
lines changed

5 files changed

+23
-11
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected void taskOperation(KillProcessAction.Request request, JobTask jobTask,
7474
logger.info("[{}] Killing job", jobTask.getJobId());
7575
auditor.info(jobTask.getJobId(), Messages.JOB_AUDIT_KILLING);
7676
try {
77-
processManager.killProcess(jobTask, true, null);
77+
jobTask.killJob("kill process (api)");
7878
listener.onResponse(new KillProcessAction.Response(true));
7979
} catch (Exception e) {
8080
listener.onFailure(e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,14 @@ public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason)
187187
.kill();
188188
} else {
189189
// If the process is missing but the task exists this is most likely
190-
// due to 2 reasons. The first is because the job went into the failed
190+
// due to 3 reasons. The first is because the job went into the failed
191191
// state then the node restarted causing the task to be recreated
192192
// but the failed process wasn't. The second is that the job went into
193193
// the failed state and the user tries to remove it force-deleting it.
194194
// Force-delete issues a kill but the process will not be present
195-
// as it is cleaned up already. In both cases, we still need to remove
196-
// the task from the TaskManager (which is what the kill would do)
195+
// as it is cleaned up already. The third is that the kill has been
196+
// received before the process has even started. In all cases, we still
197+
// need to remove the task from the TaskManager (which is what the kill would do)
197198
logger.trace(() -> new ParameterizedMessage("[{}] Marking job task as completed", jobTask.getJobId()));
198199
jobTask.markAsCompleted();
199200
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ public String getJobId() {
3838
protected void onCancelled() {
3939
String reason = getReasonCancelled();
4040
LOGGER.trace(() -> new ParameterizedMessage("[{}] Cancelling job task because: {}", jobId, reason));
41-
killJob(reason);
42-
}
43-
44-
void killJob(String reason) {
41+
isClosing = true;
4542
autodetectProcessManager.killProcess(this, false, reason);
4643
}
4744

@@ -54,6 +51,11 @@ public void closeJob(String reason) {
5451
autodetectProcessManager.closeJob(this, reason);
5552
}
5653

54+
public void killJob(String reason) {
55+
isClosing = true;
56+
autodetectProcessManager.killProcess(this, true, reason);
57+
}
58+
5759
void setAutodetectProcessManager(AutodetectProcessManager autodetectProcessManager) {
5860
this.autodetectProcessManager = autodetectProcessManager;
5961
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/JobTaskTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.elasticsearch.tasks.Task;
1111
import org.elasticsearch.test.ESTestCase;
1212
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
13+
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
1314

1415
import static org.hamcrest.Matchers.is;
1516
import static org.mockito.Mockito.mock;
17+
import static org.mockito.Mockito.verify;
1618

1719
public class JobTaskTests extends ESTestCase {
1820

@@ -32,4 +34,14 @@ public void testJobTaskMatcherMatch() {
3234
assertThat(OpenJobAction.JobTaskMatcher.match(jobTask2, "ml-2"), is(true));
3335
}
3436

37+
public void testKillJob() {
38+
JobTask jobTask = new JobTask("job-to-kill", 0, "persistent", "", null, null);
39+
AutodetectProcessManager processManager = mock(AutodetectProcessManager.class);
40+
jobTask.setAutodetectProcessManager(processManager);
41+
42+
jobTask.killJob("test");
43+
44+
assertThat(jobTask.isClosing(), is(true));
45+
verify(processManager).killProcess(jobTask, true, "test");
46+
}
3547
}

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/set_upgrade_mode.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ teardown:
158158

159159
---
160160
"Setting upgrade mode to disabled from enabled":
161-
- skip:
162-
version: "all"
163-
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/74141"
164161
- do:
165162
ml.start_datafeed:
166163
datafeed_id: set-upgrade-mode-job-datafeed

0 commit comments

Comments
 (0)