Skip to content

Commit ac974c3

Browse files
authored
Pass processConnectTimeout to the method that fetches C++ process' PID (#50276) (#50290)
1 parent 30d6682 commit ac974c3

12 files changed

+42
-28
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.io.InputStream;
1515
import java.io.OutputStream;
1616
import java.nio.file.Path;
17+
import java.time.Duration;
1718
import java.util.Iterator;
1819
import java.util.List;
1920
import java.util.Objects;
@@ -27,9 +28,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
2728
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
2829
InputStream logStream, OutputStream processInStream,
2930
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
30-
List<Path> filesToDelete, Consumer<String> onProcessCrash,
31+
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
3132
NamedXContentRegistry namedXContentRegistry) {
32-
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
33+
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
34+
processConnectTimeout);
3335
this.name = Objects.requireNonNull(name);
3436
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
3537
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.io.InputStream;
1515
import java.io.OutputStream;
1616
import java.nio.file.Path;
17+
import java.time.Duration;
1718
import java.util.List;
1819
import java.util.Objects;
1920
import java.util.function.Consumer;
@@ -26,10 +27,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
2627

2728
protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
2829
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
29-
Consumer<String> onProcessCrash, AnalyticsProcessConfig config,
30+
Consumer<String> onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config,
3031
NamedXContentRegistry namedXContentRegistry) {
3132
super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields,
32-
filesToDelete, onProcessCrash, namedXContentRegistry);
33+
filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
3334
this.config = Objects.requireNonNull(config);
3435
}
3536

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
8484

8585
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
8686
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
87-
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig,
88-
namedXContentRegistry);
87+
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
88+
analyticsProcessConfig, namedXContentRegistry);
8989

9090
try {
9191
startProcess(config, executorService, processPipes, analyticsProcess);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.InputStream;
1313
import java.io.OutputStream;
1414
import java.nio.file.Path;
15+
import java.time.Duration;
1516
import java.util.List;
1617
import java.util.function.Consumer;
1718

@@ -22,9 +23,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
2223
protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream,
2324
OutputStream processInStream, InputStream processOutStream,
2425
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
25-
Consumer<String> onProcessCrash) {
26+
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
2627
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream,
27-
numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY);
28+
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
2829
}
2930

3031
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
7575
null,
7676
0,
7777
filesToDelete,
78-
onProcessCrash);
78+
onProcessCrash,
79+
processConnectTimeout);
7980

8081
try {
8182
process.start(executorService);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.InputStream;
2727
import java.io.OutputStream;
2828
import java.nio.file.Path;
29+
import java.time.Duration;
2930
import java.util.Iterator;
3031
import java.util.List;
3132
import java.util.function.Consumer;
@@ -43,8 +44,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
4344

4445
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
4546
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
46-
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
47-
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
47+
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
48+
Duration processConnectTimeout) {
49+
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
50+
processConnectTimeout);
4851
this.resultsParser = resultsParser;
4952
}
5053

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
9191
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
9292
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
9393
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
94-
filesToDelete, resultsParser, onProcessCrash);
94+
filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
9595
try {
9696
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
9797
return autodetect;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.io.InputStream;
1212
import java.io.OutputStream;
13+
import java.time.Duration;
1314
import java.util.Collections;
1415

1516
/**
@@ -19,8 +20,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
1920

2021
private static final String NAME = "normalizer";
2122

22-
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) {
23-
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
23+
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
24+
Duration processConnectTimeout) {
25+
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout);
2426
}
2527

2628
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS
5353
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
5454

5555
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
56-
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
56+
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);
5757

5858
try {
5959
normalizerProcess.start(executorService);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
5151
private final int numberOfFields;
5252
private final List<Path> filesToDelete;
5353
private final Consumer<String> onProcessCrash;
54+
private final Duration processConnectTimeout;
5455
private volatile Future<?> logTailFuture;
5556
private volatile Future<?> stateProcessorFuture;
5657
private volatile boolean processCloseInitiated;
@@ -59,17 +60,18 @@ public abstract class AbstractNativeProcess implements NativeProcess {
5960

6061
protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
6162
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
62-
Consumer<String> onProcessCrash) {
63+
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
6364
this.jobId = jobId;
64-
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
65+
this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
6566
this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
6667
this.processOutStream = processOutStream;
6768
this.processRestoreStream = processRestoreStream;
6869
this.recordWriter = new LengthEncodedWriter(this.processInStream);
69-
startTime = ZonedDateTime.now();
70+
this.startTime = ZonedDateTime.now();
7071
this.numberOfFields = numberOfFields;
7172
this.filesToDelete = filesToDelete;
7273
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
74+
this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
7375
}
7476

7577
public abstract String getName();
@@ -195,10 +197,9 @@ public void kill() throws IOException {
195197
LOGGER.debug("[{}] Killing {} process", jobId, getName());
196198
processKilled = true;
197199
try {
198-
// The PID comes via the processes log stream. We don't wait for it to arrive here,
199-
// but if the wait times out it implies the process has only just started, in which
200-
// case it should die very quickly when we close its input stream.
201-
NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
200+
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
201+
// Without the PID we cannot kill the process.
202+
NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(processConnectTimeout));
202203

203204
// Wait for the process to die before closing processInStream as if the process
204205
// is still alive when processInStream is closed it may start persisting state

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.OutputStream;
2626
import java.nio.ByteBuffer;
2727
import java.nio.charset.StandardCharsets;
28+
import java.time.Duration;
2829
import java.time.ZonedDateTime;
2930
import java.time.temporal.ChronoUnit;
3031
import java.util.Collections;
@@ -62,7 +63,7 @@ public void testProcessStartTime() throws Exception {
6263
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
6364
mock(OutputStream.class), outputStream, mock(OutputStream.class),
6465
NUMBER_FIELDS, null,
65-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
66+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
6667
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
6768

6869
ZonedDateTime startTime = process.getProcessStartTime();
@@ -85,7 +86,7 @@ public void testWriteRecord() throws IOException {
8586
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
8687
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
8788
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
88-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
89+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
8990
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
9091

9192
process.writeRecord(record);
@@ -120,7 +121,7 @@ public void testFlush() throws IOException {
120121
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
121122
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
122123
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
123-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
124+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
124125
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
125126

126127
FlushJobParams params = FlushJobParams.builder().build();
@@ -154,7 +155,8 @@ public void testConsumeAndCloseOutputStream() throws IOException {
154155

155156
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
156157
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
157-
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
158+
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
159+
Duration.ZERO)) {
158160

159161
process.consumeAndCloseOutputStream();
160162
assertThat(processOutStream.available(), equalTo(0));
@@ -170,7 +172,7 @@ private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunc
170172
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
171173
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
172174
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
173-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
175+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
174176
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
175177

176178
writeFunction.accept(process);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.io.InputStream;
1818
import java.io.OutputStream;
19+
import java.time.Duration;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.TimeUnit;
@@ -140,7 +141,7 @@ public void testIsReady() throws Exception {
140141
private class TestNativeProcess extends AbstractNativeProcess {
141142

142143
TestNativeProcess(OutputStream inputStream) {
143-
super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash);
144+
super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
144145
}
145146

146147
@Override

0 commit comments

Comments
 (0)