Skip to content

Fix race condition when stopping a data frame analytics jobs immediately after starting it #50276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand All @@ -28,10 +29,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
NativeController nativeController, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
List<Path> filesToDelete, Consumer<String> onProcessCrash,
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
NamedXContentRegistry namedXContentRegistry) {
super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
onProcessCrash);
onProcessCrash, processConnectTimeout);
this.name = Objects.requireNonNull(name);
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -27,10 +28,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy

protected NativeAnalyticsProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
List<Path> filesToDelete, Consumer<String> onProcessCrash, AnalyticsProcessConfig config,
NamedXContentRegistry namedXContentRegistry) {
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) {
super(NAME, AnalyticsResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream,
numberOfFields, filesToDelete, onProcessCrash, namedXContentRegistry);
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
this.config = Objects.requireNonNull(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co

createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig,
namedXContentRegistry);
NativeAnalyticsProcess analyticsProcess =
new NativeAnalyticsProcess(
jobId, nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete,
onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry);

try {
startProcess(config, executorService, processPipes, analyticsProcess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -23,9 +24,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
protected NativeMemoryUsageEstimationProcess(String jobId, NativeController nativeController, InputStream logStream,
OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash) {
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream,
processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY);
processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
null,
0,
filesToDelete,
onProcessCrash);
onProcessCrash,
processConnectTimeout);

try {
process.start(executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -44,9 +45,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec

NativeAutodetectProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
Duration processConnectTimeout) {
super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
onProcessCrash);
onProcessCrash, processConnectTimeout);
this.resultsParser = resultsParser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
filesToDelete, resultsParser, onProcessCrash);
filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
try {
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
return autodetect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Collections;

/**
Expand All @@ -21,8 +22,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
private static final String NAME = "normalizer";

NativeNormalizerProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
InputStream processOutStream) {
super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
InputStream processOutStream, Duration processConnectTimeout) {
super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {},
processConnectTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);

NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);

try {
normalizerProcess.start(executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private final int numberOfFields;
private final List<Path> filesToDelete;
private final Consumer<String> onProcessCrash;
private final Duration processConnectTimeout;
private volatile Future<?> logTailFuture;
private volatile Future<?> stateProcessorFuture;
private volatile boolean processCloseInitiated;
Expand All @@ -60,18 +61,19 @@ public abstract class AbstractNativeProcess implements NativeProcess {

protected AbstractNativeProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
List<Path> filesToDelete, Consumer<String> onProcessCrash) {
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout) {
this.jobId = jobId;
this.nativeController = nativeController;
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
this.processOutStream = processOutStream;
this.processRestoreStream = processRestoreStream;
this.recordWriter = new LengthEncodedWriter(this.processInStream);
startTime = ZonedDateTime.now();
this.startTime = ZonedDateTime.now();
this.numberOfFields = numberOfFields;
this.filesToDelete = filesToDelete;
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
}

public abstract String getName();
Expand Down Expand Up @@ -197,10 +199,9 @@ public void kill() throws IOException {
LOGGER.debug("[{}] Killing {} process", jobId, getName());
processKilled = true;
try {
// The PID comes via the processes log stream. We don't wait for it to arrive here,
// but if the wait times out it implies the process has only just started, in which
// case it should die very quickly when we close its input stream.
nativeController.killProcess(cppLogHandler.getPid(Duration.ZERO));
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
// Without the PID we cannot kill the process.
nativeController.killProcess(cppLogHandler.getPid(processConnectTimeout));

// Wait for the process to die before closing processInStream as if the process
// is still alive when processInStream is closed it may start persisting state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testProcessStartTime() throws Exception {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
mock(OutputStream.class), outputStream, mock(OutputStream.class),
NUMBER_FIELDS, null,
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));

ZonedDateTime startTime = process.getProcessStartTime();
Expand All @@ -86,7 +87,7 @@ public void testWriteRecord() throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));

process.writeRecord(record);
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testFlush() throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));

FlushJobParams params = FlushJobParams.builder().build();
Expand Down Expand Up @@ -155,7 +156,8 @@ public void testConsumeAndCloseOutputStream() throws IOException {

try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
Duration.ZERO)) {

process.consumeAndCloseOutputStream();
assertThat(processOutStream.available(), equalTo(0));
Expand All @@ -171,7 +173,7 @@ private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunc
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));

writeFunction.accept(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -142,7 +143,7 @@ public void testIsReady() throws Exception {
private class TestNativeProcess extends AbstractNativeProcess {

TestNativeProcess(OutputStream inputStream) {
super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash);
super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
}

@Override
Expand Down