Skip to content

Commit 114d8ef

Browse files
authored
Pass processConnectTimeout to the method that fetches C++ process' PID (#50276)
1 parent d2aee62 commit 114d8ef

12 files changed

+44
-31
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.InputStream;
1616
import java.io.OutputStream;
1717
import java.nio.file.Path;
18+
import java.time.Duration;
1819
import java.util.Iterator;
1920
import java.util.List;
2021
import java.util.Objects;
@@ -28,10 +29,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
2829
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
2930
NativeController nativeController, InputStream logStream, OutputStream processInStream,
3031
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
31-
List<Path> filesToDelete, Consumer<String> onProcessCrash,
32+
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
3233
NamedXContentRegistry namedXContentRegistry) {
3334
super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
34-
onProcessCrash);
35+
onProcessCrash, processConnectTimeout);
3536
this.name = Objects.requireNonNull(name);
3637
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
3738
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.InputStream;
1616
import java.io.OutputStream;
1717
import java.nio.file.Path;
18+
import java.time.Duration;
1819
import java.util.List;
1920
import java.util.Objects;
2021
import java.util.function.Consumer;
@@ -27,10 +28,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
2728

2829
protected NativeAnalyticsProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
2930
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
30-
List<Path> filesToDelete, Consumer<String> onProcessCrash, AnalyticsProcessConfig config,
31-
NamedXContentRegistry namedXContentRegistry) {
31+
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
32+
AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) {
3233
super(NAME, AnalyticsResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream,
33-
numberOfFields, filesToDelete, onProcessCrash, namedXContentRegistry);
34+
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
3435
this.config = Objects.requireNonNull(config);
3536
}
3637

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,11 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
8282

8383
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
8484

85-
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(),
86-
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
87-
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig,
88-
namedXContentRegistry);
85+
NativeAnalyticsProcess analyticsProcess =
86+
new NativeAnalyticsProcess(
87+
jobId, nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
88+
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete,
89+
onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry);
8990

9091
try {
9192
startProcess(config, executorService, processPipes, analyticsProcess);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.io.InputStream;
1414
import java.io.OutputStream;
1515
import java.nio.file.Path;
16+
import java.time.Duration;
1617
import java.util.List;
1718
import java.util.function.Consumer;
1819

@@ -23,9 +24,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
2324
protected NativeMemoryUsageEstimationProcess(String jobId, NativeController nativeController, InputStream logStream,
2425
OutputStream processInStream, InputStream processOutStream,
2526
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
26-
Consumer<String> onProcessCrash) {
27+
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
2728
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream,
28-
processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY);
29+
processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
2930
}
3031

3132
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
7676
null,
7777
0,
7878
filesToDelete,
79-
onProcessCrash);
79+
onProcessCrash,
80+
processConnectTimeout);
8081

8182
try {
8283
process.start(executorService);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.InputStream;
2828
import java.io.OutputStream;
2929
import java.nio.file.Path;
30+
import java.time.Duration;
3031
import java.util.Iterator;
3132
import java.util.List;
3233
import java.util.function.Consumer;
@@ -44,9 +45,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
4445

4546
NativeAutodetectProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
4647
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
47-
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
48+
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
49+
Duration processConnectTimeout) {
4850
super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
49-
onProcessCrash);
51+
onProcessCrash, processConnectTimeout);
5052
this.resultsParser = resultsParser;
5153
}
5254

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
9191
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
9292
job.getId(), nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
9393
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
94-
filesToDelete, resultsParser, onProcessCrash);
94+
filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
9595
try {
9696
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
9797
return autodetect;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.io.InputStream;
1313
import java.io.OutputStream;
14+
import java.time.Duration;
1415
import java.util.Collections;
1516

1617
/**
@@ -21,8 +22,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
2122
private static final String NAME = "normalizer";
2223

2324
NativeNormalizerProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
24-
InputStream processOutStream) {
25-
super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
25+
InputStream processOutStream, Duration processConnectTimeout) {
26+
super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {},
27+
processConnectTimeout);
2628
}
2729

2830
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS
5353
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
5454

5555
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes.getLogStream().get(),
56-
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
56+
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);
5757

5858
try {
5959
normalizerProcess.start(executorService);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
5252
private final int numberOfFields;
5353
private final List<Path> filesToDelete;
5454
private final Consumer<String> onProcessCrash;
55+
private final Duration processConnectTimeout;
5556
private volatile Future<?> logTailFuture;
5657
private volatile Future<?> stateProcessorFuture;
5758
private volatile boolean processCloseInitiated;
@@ -60,18 +61,19 @@ public abstract class AbstractNativeProcess implements NativeProcess {
6061

6162
protected AbstractNativeProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
6263
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
63-
List<Path> filesToDelete, Consumer<String> onProcessCrash) {
64+
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout) {
6465
this.jobId = jobId;
6566
this.nativeController = nativeController;
66-
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
67+
this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
6768
this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
6869
this.processOutStream = processOutStream;
6970
this.processRestoreStream = processRestoreStream;
7071
this.recordWriter = new LengthEncodedWriter(this.processInStream);
71-
startTime = ZonedDateTime.now();
72+
this.startTime = ZonedDateTime.now();
7273
this.numberOfFields = numberOfFields;
7374
this.filesToDelete = filesToDelete;
7475
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
76+
this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
7577
}
7678

7779
public abstract String getName();
@@ -197,10 +199,9 @@ public void kill() throws IOException {
197199
LOGGER.debug("[{}] Killing {} process", jobId, getName());
198200
processKilled = true;
199201
try {
200-
// The PID comes via the processes log stream. We don't wait for it to arrive here,
201-
// but if the wait times out it implies the process has only just started, in which
202-
// case it should die very quickly when we close its input stream.
203-
nativeController.killProcess(cppLogHandler.getPid(Duration.ZERO));
202+
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
203+
// Without the PID we cannot kill the process.
204+
nativeController.killProcess(cppLogHandler.getPid(processConnectTimeout));
204205

205206
// Wait for the process to die before closing processInStream as if the process
206207
// is still alive when processInStream is closed it may start persisting state

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.OutputStream;
2727
import java.nio.ByteBuffer;
2828
import java.nio.charset.StandardCharsets;
29+
import java.time.Duration;
2930
import java.time.ZonedDateTime;
3031
import java.time.temporal.ChronoUnit;
3132
import java.util.Collections;
@@ -63,7 +64,7 @@ public void testProcessStartTime() throws Exception {
6364
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
6465
mock(OutputStream.class), outputStream, mock(OutputStream.class),
6566
NUMBER_FIELDS, null,
66-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
67+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
6768
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
6869

6970
ZonedDateTime startTime = process.getProcessStartTime();
@@ -86,7 +87,7 @@ public void testWriteRecord() throws IOException {
8687
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
8788
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
8889
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
89-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
90+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
9091
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
9192

9293
process.writeRecord(record);
@@ -121,7 +122,7 @@ public void testFlush() throws IOException {
121122
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
122123
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
123124
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
124-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
125+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
125126
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
126127

127128
FlushJobParams params = FlushJobParams.builder().build();
@@ -155,7 +156,8 @@ public void testConsumeAndCloseOutputStream() throws IOException {
155156

156157
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
157158
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
158-
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
159+
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
160+
Duration.ZERO)) {
159161

160162
process.consumeAndCloseOutputStream();
161163
assertThat(processOutStream.available(), equalTo(0));
@@ -171,7 +173,7 @@ private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunc
171173
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
172174
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
173175
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
174-
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
176+
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
175177
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
176178

177179
writeFunction.accept(process);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.io.InputStream;
1818
import java.io.OutputStream;
19+
import java.time.Duration;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.TimeUnit;
@@ -142,7 +143,7 @@ public void testIsReady() throws Exception {
142143
private class TestNativeProcess extends AbstractNativeProcess {
143144

144145
TestNativeProcess(OutputStream inputStream) {
145-
super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash);
146+
super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
146147
}
147148

148149
@Override

0 commit comments

Comments
 (0)