Skip to content

Commit 821102d

Browse files
authored
[ML] Take more care that memory estimation uses unique named pipes (#60395)
Prior to this change ML memory estimation processes for a given job would always use the same named pipe names. This would often cause one of the processes to fail. This change avoids this risk by adding an incrementing counter value into the named pipe names used for memory estimation processes. Relates elastic/kibana#70885
1 parent 4a5242c commit 821102d

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java

+47
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.ResourceNotFoundException;
9+
import org.elasticsearch.action.ActionFuture;
910
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1011
import org.elasticsearch.action.bulk.BulkResponse;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.index.query.QueryBuilders;
1516
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
17+
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
1618
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1719
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
1820
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
@@ -22,6 +24,8 @@
2224
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
2325

2426
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
2529

2630
import static org.hamcrest.Matchers.equalTo;
2731
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -127,6 +131,49 @@ public void testTrainingPercentageIsApplied() throws IOException {
127131
lessThanOrEqualTo(allDataUsedForTraining));
128132
}
129133

134+
public void testSimultaneousExplainSameConfig() throws IOException {
135+
136+
final int simultaneousInvocationCount = 10;
137+
138+
String sourceIndex = "test-simultaneous-explain";
139+
RegressionIT.indexData(sourceIndex, 100, 0);
140+
141+
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
142+
.setId("dfa-simultaneous-explain-" + sourceIndex)
143+
.setSource(new DataFrameAnalyticsSource(new String[]{sourceIndex},
144+
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
145+
null))
146+
.setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD,
147+
BoostedTreeParams.builder().build(),
148+
null,
149+
100.0,
150+
null,
151+
null,
152+
null))
153+
.buildForExplain();
154+
155+
List<ActionFuture<ExplainDataFrameAnalyticsAction.Response>> futures = new ArrayList<>();
156+
157+
for (int i = 0; i < simultaneousInvocationCount; ++i) {
158+
futures.add(client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, new PutDataFrameAnalyticsAction.Request(config)));
159+
}
160+
161+
ExplainDataFrameAnalyticsAction.Response previous = null;
162+
for (ActionFuture<ExplainDataFrameAnalyticsAction.Response> future : futures) {
163+
// The main purpose of this test is that actionGet() here will throw an exception
164+
// if any of the simultaneous calls returns an error due to interaction between
165+
// the many estimation processes that get run
166+
ExplainDataFrameAnalyticsAction.Response current = future.actionGet(10000);
167+
if (previous != null) {
168+
// A secondary check the test can perform is that the multiple invocations
169+
// return the same result (but it was failures due to unwanted interactions
170+
// that caused this test to be written)
171+
assertEquals(previous, current);
172+
}
173+
previous = current;
174+
}
175+
}
176+
130177
@Override
131178
boolean supportsInference() {
132179
return false;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.Objects;
3131
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.atomic.AtomicLong;
3233
import java.util.function.Consumer;
3334

3435
public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProcessFactory<MemoryUsageEstimationResult> {
@@ -39,11 +40,13 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
3940

4041
private final Environment env;
4142
private final NativeController nativeController;
43+
private final AtomicLong counter;
4244
private volatile Duration processConnectTimeout;
4345

4446
public NativeMemoryUsageEstimationProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
4547
this.env = Objects.requireNonNull(env);
4648
this.nativeController = Objects.requireNonNull(nativeController);
49+
this.counter = new AtomicLong(0);
4750
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
4851
clusterService.getClusterSettings().addSettingsUpdateConsumer(
4952
MachineLearning.PROCESS_CONNECT_TIMEOUT, this::setProcessConnectTimeout);
@@ -61,8 +64,13 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
6164
ExecutorService executorService,
6265
Consumer<String> onProcessCrash) {
6366
List<Path> filesToDelete = new ArrayList<>();
67+
// The config ID passed to the process pipes is only used to make the file names unique. Since memory estimation can be
68+
// called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the
69+
// memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID
70+
// to ensure uniqueness between calls.
6471
ProcessPipes processPipes = new ProcessPipes(
65-
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId(), false, false, true, false, false);
72+
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
73+
false, false, true, false, false);
6674

6775
createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
6876

0 commit comments

Comments
 (0)