From 404bc723d776de6a9babd85f519ca8f3edacf239 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 29 Apr 2021 10:06:40 +0300 Subject: [PATCH] [7.x][ML] Reduce input stream buffer from 8KB to 2KB (#72412) When we flush the input stream the java side writes enough spaces to fill the input stream buffer. However, in the case of data frame analytics, this may cause the job to freeze. The reason is that java writes the data and flushes in the same thread that goes on to then restore the state. However, when c++ reads in the end-of-data control message, it stops reading from the stream and goes on to perform the analysis. If the 8KB of spaces do not fit in the OS buffer for the names pipe, the java side blocks. It never proceeds with restoring the state and this causes a job that is being restarted and has state to freeze. In elastic/ml-cpp#1881 the buffer has been reduced to 2KB. This means the buffer is smaller than the buffer of all supported OS. Note that it is 4KB on Windows. Thus in this commit we also reduce the number of spaces we write in order to flush the buffer to match that of the buffer size. Closes #70698 Backport of #72412 --- .../elasticsearch/xpack/ml/integration/ClassificationIT.java | 1 - .../xpack/ml/process/writer/AbstractControlMsgWriter.java | 2 +- .../dataframe/process/AnalyticsControlMessageWriterTests.java | 2 +- .../autodetect/writer/AutodetectControlMsgWriterTests.java | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index df5d8df76399b..d2d1eeb5f6c8a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -475,7 +475,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI "classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581") public void testStopAndRestart() throws Exception { initialize("classification_stop_and_restart"); String predictedClassField = KEYWORD_FIELD + "_prediction"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java index a3daa3f056431..8d3c648783249 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java @@ -18,7 +18,7 @@ public abstract class AbstractControlMsgWriter { /** * This should be the same size as the buffer in the C++ native process. */ - public static final int FLUSH_SPACES_LENGTH = 8192; + public static final int FLUSH_SPACES_LENGTH = 2048; protected final LengthEncodedWriter lengthEncodedWriter; private final int numberOfFields; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java index 7b0a108d93082..eb7e0c0b772cc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java @@ -39,7 +39,7 @@ public void testWriteEndOfData() throws IOException { inOrder.verify(lengthEncodedWriter).writeField("$"); StringBuilder spaces = new StringBuilder(); - IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' ')); + IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' ')); inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField(spaces.toString()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java index 263b36b1f368d..3e3f0c82b37a8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java @@ -159,7 +159,7 @@ public void testWriteFlushMessage() throws IOException { inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); StringBuilder spaces = new StringBuilder(); - IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' ')); + IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' ')); inOrder.verify(lengthEncodedWriter).writeField(spaces.toString()); inOrder.verify(lengthEncodedWriter).flush();