diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 64668f2d253fa..588f178606f51 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { @@ -45,6 +46,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory onProcessCrash) { String jobId = config.getId(); List filesToDelete = new ArrayList<>(); + // When the stop API is called the process is killed. As it may take some time for the OS (especially Windows) + // to delete the named pipes, we use a unique identifier to avoid reusing an older named pipe if the task + // gets restarted immediately after stopping. ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId, - null, false, true, true, hasState, config.getAnalysis().persistsState()); + counter.incrementAndGet(), false, true, true, hasState, config.getAnalysis().persistsState()); // The extra 2 are for the checksum and the control field int numberOfFields = analyticsProcessConfig.cols() + 2;