From ebcf2727467e291c5d126ab5d8ee33f2f95e6bd0 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 26 Mar 2021 16:38:14 +0200 Subject: [PATCH] [ML] Use unique named pipes for data frame analytics When a data frame analytics job is stopped because of a call to the _stop API, if the process is running it is killed. Depending on the OS, it may take some time to delete all the used named pipes. This means that in a scenario where the job is restarted immediately after it is possible that the old named pipes are used which results to the new process not properly communicating with java. This has been the underlying issue of #70698 and #67581. This commit fixes it by using unique identifiers for the named pipes. Closes #70698 --- .../dataframe/process/NativeAnalyticsProcessFactory.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 64668f2d253fa..588f178606f51 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { @@ -45,6 +46,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory onProcessCrash) { String jobId = config.getId(); List filesToDelete = new ArrayList<>(); + // When the stop API is called the process is killed. As it may take some time for the OS (especially Windows) + // to delete the named pipes, we use a unique identifier to avoid reusing an older named pipe if the task + // gets restarted immediately after stopping. ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId, - null, false, true, true, hasState, config.getAnalysis().persistsState()); + counter.incrementAndGet(), false, true, true, hasState, config.getAnalysis().persistsState()); // The extra 2 are for the checksum and the control field int numberOfFields = analyticsProcessConfig.cols() + 2;