diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java index 4dece0cc10266..98925bce1eeba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java @@ -15,6 +15,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -28,10 +29,10 @@ abstract class AbstractNativeAnalyticsProcess extends AbstractNativeProc protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser resultParser, String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, - List filesToDelete, Consumer onProcessCrash, + List filesToDelete, Consumer onProcessCrash, Duration processConnectTimeout, NamedXContentRegistry namedXContentRegistry) { super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, - onProcessCrash); + onProcessCrash, processConnectTimeout); this.name = Objects.requireNonNull(name); this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java index 55cf771f305d4..1900d95fa1c93 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java @@ -15,6 +15,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -27,10 +28,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess filesToDelete, Consumer onProcessCrash, AnalyticsProcessConfig config, - NamedXContentRegistry namedXContentRegistry) { + List filesToDelete, Consumer onProcessCrash, Duration processConnectTimeout, + AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) { super(NAME, AnalyticsResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, - numberOfFields, filesToDelete, onProcessCrash, namedXContentRegistry); + numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry); this.config = Objects.requireNonNull(config); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index d2ace11f553ae..5da16ecc8bfe6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -82,10 +82,11 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); - NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), - processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig, - namedXContentRegistry); + NativeAnalyticsProcess analyticsProcess = + new NativeAnalyticsProcess( + jobId, nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), + processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, + onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry); try { startProcess(config, executorService, processPipes, analyticsProcess); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java index 2d95dacd2719a..43d33066a8890 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java @@ -13,6 +13,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.function.Consumer; @@ -23,9 +24,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP protected NativeMemoryUsageEstimationProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - Consumer onProcessCrash) { + Consumer onProcessCrash, Duration processConnectTimeout) { super(NAME, MemoryUsageEstimationResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream, - processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY); + processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 011a1b5f1d442..f1e8a81f337f0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -76,7 +76,8 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess( null, 0, filesToDelete, - onProcessCrash); + onProcessCrash, + processConnectTimeout); try { process.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 8f4d3d428e3d4..55abb78957bf2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; @@ -44,9 +45,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec NativeAutodetectProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - ProcessResultsParser resultsParser, Consumer onProcessCrash) { + ProcessResultsParser resultsParser, Consumer onProcessCrash, + Duration processConnectTimeout) { super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, - onProcessCrash); + onProcessCrash, processConnectTimeout); this.resultsParser = resultsParser; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 2504439a220e7..6fd1224270d41 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -91,7 +91,7 @@ public AutodetectProcess createAutodetectProcess(Job job, NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, - filesToDelete, resultsParser, onProcessCrash); + filesToDelete, resultsParser, onProcessCrash, processConnectTimeout); try { autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); return autodetect; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index 69c500f9d1637..f9e019a55a7e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -11,6 +11,7 @@ import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.Collections; /** @@ -21,8 +22,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize private static final String NAME = "normalizer"; NativeNormalizerProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream, - InputStream processOutStream) { - super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}); + InputStream processOutStream, Duration processConnectTimeout) { + super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}, + processConnectTimeout); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 0022a44f42fb9..75abd08a5fa89 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -53,7 +53,7 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get()); + processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout); try { normalizerProcess.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index db2dad41c6f87..450f5d1288d9c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -52,6 +52,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final int numberOfFields; private final List filesToDelete; private final Consumer onProcessCrash; + private final Duration processConnectTimeout; private volatile Future logTailFuture; private volatile Future stateProcessorFuture; private volatile boolean processCloseInitiated; @@ -60,18 +61,19 @@ public abstract class AbstractNativeProcess implements NativeProcess { protected AbstractNativeProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, - List filesToDelete, Consumer onProcessCrash) { + List filesToDelete, Consumer onProcessCrash, Duration processConnectTimeout) { this.jobId = jobId; this.nativeController = nativeController; - cppLogHandler = new CppLogMessageHandler(jobId, logStream); + this.cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null; this.processOutStream = processOutStream; this.processRestoreStream = processRestoreStream; this.recordWriter = new LengthEncodedWriter(this.processInStream); - startTime = ZonedDateTime.now(); + this.startTime = ZonedDateTime.now(); this.numberOfFields = numberOfFields; this.filesToDelete = filesToDelete; this.onProcessCrash = Objects.requireNonNull(onProcessCrash); + this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout); } public abstract String getName(); @@ -197,10 +199,9 @@ public void kill() throws IOException { LOGGER.debug("[{}] Killing {} process", jobId, getName()); processKilled = true; try { - // The PID comes via the processes log stream. We don't wait for it to arrive here, - // but if the wait times out it implies the process has only just started, in which - // case it should die very quickly when we close its input stream. - nativeController.killProcess(cppLogHandler.getPid(Duration.ZERO)); + // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. + // Without the PID we cannot kill the process. + nativeController.killProcess(cppLogHandler.getPid(processConnectTimeout)); // Wait for the process to die before closing processInStream as if the process // is still alive when processInStream is closed it may start persisting state diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 145d80ecdfc6d..903938b82db65 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Collections; @@ -63,7 +64,7 @@ public void testProcessStartTime() throws Exception { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream, mock(OutputStream.class), outputStream, mock(OutputStream.class), NUMBER_FIELDS, null, - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -86,7 +87,7 @@ public void testWriteRecord() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); @@ -121,7 +122,7 @@ public void testFlush() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); @@ -155,7 +156,8 @@ public void testConsumeAndCloseOutputStream() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream, processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), + Duration.ZERO)) { process.consumeAndCloseOutputStream(); assertThat(processOutStream.available(), equalTo(0)); @@ -171,7 +173,7 @@ private void testWriteMessage(CheckedConsumer writeFunc ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index 6a442ab089639..5f021a36c05df 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -142,7 +143,7 @@ public void testIsReady() throws Exception { private class TestNativeProcess extends AbstractNativeProcess { TestNativeProcess(OutputStream inputStream) { - super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash); + super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO); } @Override