Skip to content

Commit 362a346

Browse files
committed
[ML] Introduce a setting for the process connect timeout (#43234)
This change introduces a new setting, xpack.ml.process_connect_timeout, to enable the timeout for one of the external ML processes to connect to the ES JVM to be increased. The timeout may need to be increased if many processes are being started simultaneously on the same machine. This is unlikely in clusters with many ML nodes, as we balance the processes across the ML nodes, but can happen in clusters with a single ML node and a high value for xpack.ml.node_concurrent_job_allocations.
1 parent fa23b68 commit 362a346

File tree

5 files changed

+100
-8
lines changed

5 files changed

+100
-8
lines changed

docs/reference/settings/ml-settings.asciidoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,11 @@ cluster and the job is assigned to run on that node.
9898
IMPORTANT: This setting assumes some external process is capable of adding ML nodes
9999
to the cluster. This setting is only useful when used in conjunction with
100100
such an external process.
101+
102+
`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
103+
The connection timeout for {ml} processes that run separately from the {es} JVM.
104+
Defaults to `10s`. Some {ml} processing is done by processes that run separately
105+
to the {es} JVM. When such processes are started they must connect to the {es}
106+
JVM. If such a process does not connect within the time period specified by this
107+
setting then the process is assumed to have failed. Defaults to `10s`. The minimum
108+
value for this setting is `5s`.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
281281
public static final Setting<Integer> MAX_LAZY_ML_NODES =
282282
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
283283

284+
public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
285+
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
286+
TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);
287+
284288
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
285289

286290
private final Settings settings;
@@ -309,6 +313,7 @@ public MachineLearning(Settings settings, Path configPath) {
309313
public List<Setting<?>> getSettings() {
310314
return Collections.unmodifiableList(
311315
Arrays.asList(MachineLearningField.AUTODETECT_PROCESS,
316+
PROCESS_CONNECT_TIMEOUT,
312317
ML_ENABLED,
313318
CONCURRENT_JOB_ALLOCATIONS,
314319
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
@@ -423,7 +428,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
423428
nativeController,
424429
client,
425430
clusterService);
426-
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
431+
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
427432
} catch (IOException e) {
428433
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
429434
// only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.cluster.service.ClusterService;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.unit.TimeValue;
1314
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1415
import org.elasticsearch.core.internal.io.IOUtils;
1516
import org.elasticsearch.env.Environment;
@@ -35,13 +36,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
3536

3637
private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
3738
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
38-
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
3939

4040
private final Client client;
4141
private final Environment env;
4242
private final Settings settings;
4343
private final NativeController nativeController;
4444
private final ClusterService clusterService;
45+
private volatile Duration processConnectTimeout;
4546

4647
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
4748
ClusterService clusterService) {
@@ -50,6 +51,13 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native
5051
this.nativeController = Objects.requireNonNull(nativeController);
5152
this.client = client;
5253
this.clusterService = clusterService;
54+
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
55+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
56+
this::setProcessConnectTimeout);
57+
}
58+
59+
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
60+
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
5361
}
5462

5563
@Override
@@ -86,8 +94,8 @@ public AutodetectProcess createAutodetectProcess(Job job,
8694
}
8795
}
8896

89-
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
90-
List<Path> filesToDelete) {
97+
void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
98+
List<Path> filesToDelete) {
9199
try {
92100

93101
Settings updatedSettings = Settings.builder()
@@ -107,7 +115,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
107115
autodetectBuilder.quantiles(autodetectParams.quantiles());
108116
}
109117
autodetectBuilder.build();
110-
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
118+
processPipes.connectStreams(processConnectTimeout);
111119
} catch (IOException e) {
112120
String msg = "Failed to launch autodetect for job " + job.getId();
113121
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.unit.TimeValue;
1012
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1113
import org.elasticsearch.core.internal.io.IOUtils;
1214
import org.elasticsearch.env.Environment;
1315
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
16+
import org.elasticsearch.xpack.ml.MachineLearning;
1417
import org.elasticsearch.xpack.ml.process.NativeController;
1518
import org.elasticsearch.xpack.ml.process.ProcessPipes;
1619
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
2528

2629
private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
2730
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
28-
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
2931

3032
private final Environment env;
3133
private final NativeController nativeController;
34+
private volatile Duration processConnectTimeout;
3235

33-
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
36+
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
3437
this.env = Objects.requireNonNull(env);
3538
this.nativeController = Objects.requireNonNull(nativeController);
39+
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
40+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
41+
this::setProcessConnectTimeout);
42+
}
43+
44+
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
45+
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
3646
}
3747

3848
@Override
@@ -64,7 +74,7 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
6474
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
6575
processPipes.addArgs(command);
6676
nativeController.startProcess(command);
67-
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
77+
processPipes.connectStreams(processConnectTimeout);
6878
} catch (IOException e) {
6979
String msg = "Failed to launch normalizer for job " + jobId;
7080
LOGGER.error(msg);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.process.autodetect;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.cluster.service.ClusterService;
10+
import org.elasticsearch.common.settings.ClusterSettings;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.util.set.Sets;
14+
import org.elasticsearch.env.Environment;
15+
import org.elasticsearch.env.TestEnvironment;
16+
import org.elasticsearch.test.ESTestCase;
17+
import org.elasticsearch.xpack.core.ml.job.config.Job;
18+
import org.elasticsearch.xpack.ml.MachineLearning;
19+
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
20+
import org.elasticsearch.xpack.ml.process.NativeController;
21+
import org.elasticsearch.xpack.ml.process.ProcessPipes;
22+
23+
import java.io.IOException;
24+
import java.time.Duration;
25+
import java.util.Collections;
26+
27+
import static org.mockito.Matchers.eq;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.times;
30+
import static org.mockito.Mockito.verify;
31+
import static org.mockito.Mockito.when;
32+
33+
public class NativeAutodetectProcessFactoryTests extends ESTestCase {
34+
35+
public void testSetProcessConnectTimeout() throws IOException {
36+
37+
int timeoutSeconds = randomIntBetween(5, 100);
38+
39+
Settings settings = Settings.builder()
40+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
41+
.build();
42+
Environment env = TestEnvironment.newEnvironment(settings);
43+
NativeController nativeController = mock(NativeController.class);
44+
Client client = mock(Client.class);
45+
ClusterSettings clusterSettings = new ClusterSettings(settings,
46+
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
47+
ClusterService clusterService = mock(ClusterService.class);
48+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
49+
Job job = mock(Job.class);
50+
when(job.getId()).thenReturn("set_process_connect_test_job");
51+
AutodetectParams autodetectParams = mock(AutodetectParams.class);
52+
ProcessPipes processPipes = mock(ProcessPipes.class);
53+
54+
NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
55+
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
56+
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
57+
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
58+
59+
verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
60+
}
61+
}

0 commit comments

Comments
 (0)