Skip to content

Commit aab801a

Browse files
committed
[ML] Wait for controller to respond to commands
This change makes threads that send a command to the ML controller process wait for it to respond to the command. Previously such threads would block until the command was sent, but not until it was actioned. This was on the assumption that the sort of commands being sent would be actioned almost instantaneously, but that assumption has been shown to be false when anti-malware software is running. Relates elastic/ml-cpp#1520 Fixes elastic#62823
1 parent 15a2a35 commit aab801a

20 files changed

+354
-55
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
625625
AnalyticsProcessFactory<MemoryUsageEstimationResult> memoryEstimationProcessFactory;
626626
if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) {
627627
try {
628-
NativeController nativeController = NativeController.makeNativeController(clusterService.getNodeName(), environment);
628+
NativeController nativeController =
629+
NativeController.makeNativeController(clusterService.getNodeName(), environment, xContentRegistry);
629630
autodetectProcessFactory = new NativeAutodetectProcessFactory(
630631
environment,
631632
settings,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public AnalyticsBuilder performMemoryUsageEstimationOnly() {
5252
return this;
5353
}
5454

55-
public void build() throws IOException {
55+
public void build() throws IOException, InterruptedException {
5656
List<String> command = buildAnalyticsCommand();
5757
processPipes.addArgs(command);
5858
nativeController.startProcess(command);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public AnalyticsProcessManager(Settings settings,
8383
this(
8484
settings,
8585
client,
86-
threadPool.generic(),
86+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
8787
threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME),
8888
analyticsProcessFactory,
8989
auditor,
@@ -451,7 +451,7 @@ synchronized void stop() {
451451
}
452452
if (process.get() != null) {
453453
try {
454-
process.get().kill();
454+
process.get().kill(true);
455455
} catch (IOException e) {
456456
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e);
457457
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
116116
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
117117
try {
118118
analyticsBuilder.build();
119+
} catch (InterruptedException e) {
120+
Thread.currentThread().interrupt();
121+
LOGGER.warn("Interrupted while launching data frame analytics process for job " + jobId);
119122
} catch (IOException e) {
120123
String msg = "Failed to launch data frame analytics process for job " + jobId;
121124
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
102102
.performMemoryUsageEstimationOnly();
103103
try {
104104
analyticsBuilder.build();
105+
} catch (InterruptedException e) {
106+
Thread.currentThread().interrupt();
107+
LOGGER.warn("Interrupted while launching data frame analytics memory usage estimation process for job " + jobId);
105108
} catch (IOException e) {
106109
String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId;
107110
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public AutodetectBuilder scheduledEvents(List<ScheduledEvent> scheduledEvents) {
169169
/**
170170
* Requests that the controller daemon start an autodetect process.
171171
*/
172-
public void build() throws IOException {
172+
public void build() throws IOException, InterruptedException {
173173

174174
List<String> command = buildAutodetectCommand();
175175

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public void killProcess(boolean awaitCompletion, boolean finish, boolean finaliz
196196
processKilled = true;
197197
autodetectResultProcessor.setProcessKilled();
198198
autodetectWorkerExecutor.shutdown();
199-
autodetectProcess.kill();
199+
autodetectProcess.kill(awaitCompletion);
200200

201201
if (awaitCompletion) {
202202
try {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void close() {
134134
}
135135

136136
@Override
137-
public void kill() {
137+
public void kill(boolean awaitCompletion) {
138138
open = false;
139139
}
140140

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe
127127
autodetectBuilder.quantiles(autodetectParams.quantiles());
128128
}
129129
autodetectBuilder.build();
130+
} catch (InterruptedException e) {
131+
Thread.currentThread().interrupt();
132+
LOGGER.warn("Interrupted while launching autodetect for job " + job.getId());
130133
} catch (IOException e) {
131134
String msg = "Failed to launch autodetect for job " + job.getId();
132135
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void flushStream() {
9090
}
9191

9292
@Override
93-
public void kill() {
93+
public void kill(boolean awaitCompletion) {
9494
// Nothing to do
9595
}
9696

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
8181
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
8282
processPipes.addArgs(command);
8383
nativeController.startProcess(command);
84+
} catch (InterruptedException e) {
85+
Thread.currentThread().interrupt();
86+
LOGGER.warn("Interrupted while launching normalizer for job " + jobId);
8487
} catch (IOException e) {
8588
String msg = "Failed to launch normalizer for job " + jobId;
8689
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -206,19 +206,22 @@ public void close() throws IOException {
206206
}
207207

208208
@Override
209-
public void kill() throws IOException {
209+
public void kill(boolean awaitCompletion) throws IOException {
210210
LOGGER.debug("[{}] Killing {} process", jobId, getName());
211211
processKilled = true;
212212
try {
213213
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
214214
// Without the PID we cannot kill the process.
215-
nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()));
215+
nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()), awaitCompletion);
216216

217217
// Wait for the process to die before closing processInStream as if the process
218218
// is still alive when processInStream is closed it may start persisting state
219219
cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
220220
} catch (TimeoutException e) {
221221
LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName());
222+
} catch (InterruptedException e) {
223+
Thread.currentThread().interrupt();
224+
LOGGER.warn("[{}] Interrupted while killing {} process", jobId, getName());
222225
} finally {
223226
try {
224227
if (processInStream() != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.process;
8+
9+
import org.elasticsearch.common.ParseField;
10+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
11+
import org.elasticsearch.common.xcontent.ToXContentObject;
12+
import org.elasticsearch.common.xcontent.XContentBuilder;
13+
14+
import java.io.IOException;
15+
import java.util.Objects;
16+
17+
public class ControllerResponse implements ToXContentObject {
18+
19+
public static final ParseField TYPE = new ParseField("controller_response");
20+
21+
public static final ParseField COMMAND_ID = new ParseField("id");
22+
public static final ParseField SUCCESS = new ParseField("success");
23+
public static final ParseField REASON = new ParseField("reason");
24+
25+
public static final ConstructingObjectParser<ControllerResponse, Void> PARSER = new ConstructingObjectParser<>(
26+
TYPE.getPreferredName(), a -> new ControllerResponse((int) a[0], (boolean) a[1], (String) a[2]));
27+
28+
static {
29+
PARSER.declareInt(ConstructingObjectParser.constructorArg(), COMMAND_ID);
30+
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SUCCESS);
31+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
32+
}
33+
34+
private final int commandId;
35+
private final boolean success;
36+
private final String reason;
37+
38+
ControllerResponse(int commandId, boolean success, String reason) {
39+
this.commandId = commandId;
40+
this.success = success;
41+
this.reason = reason;
42+
}
43+
44+
public int getCommandId() {
45+
return commandId;
46+
}
47+
48+
public boolean isSuccess() {
49+
return success;
50+
}
51+
52+
public String getReason() {
53+
return reason;
54+
}
55+
56+
@Override
57+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
58+
builder.startObject();
59+
builder.field(COMMAND_ID.getPreferredName(), commandId);
60+
builder.field(SUCCESS.getPreferredName(), success);
61+
if (reason != null) {
62+
builder.field(REASON.getPreferredName(), reason);
63+
}
64+
builder.endObject();
65+
return builder;
66+
}
67+
68+
@Override
69+
public boolean equals(Object o) {
70+
if (this == o) {
71+
return true;
72+
}
73+
if (o == null || getClass() != o.getClass()) {
74+
return false;
75+
}
76+
ControllerResponse that = (ControllerResponse) o;
77+
return this.commandId == that.commandId &&
78+
this.success == that.success &&
79+
Objects.equals(this.reason, that.reason);
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(commandId, success, reason);
85+
}
86+
}

0 commit comments

Comments
 (0)