Skip to content

Commit 78f18c0

Browse files
[7.x][ML] Reduce input stream buffer from 8KB to 2KB (#72412) (#72449)
When we flush the input stream the java side writes enough spaces to fill the input stream buffer. However, in the case of data frame analytics, this may cause the job to freeze. The reason is that java writes the data and flushes in the same thread that goes on to then restore the state. However, when c++ reads in the end-of-data control message, it stops reading from the stream and goes on to perform the analysis. If the 8KB of spaces do not fit in the OS buffer for the names pipe, the java side blocks. It never proceeds with restoring the state and this causes a job that is being restarted and has state to freeze. In elastic/ml-cpp#1881 the buffer has been reduced to 2KB. This means the buffer is smaller than the buffer of all supported OS. Note that it is 4KB on Windows. Thus in this commit we also reduce the number of spaces we write in order to flush the buffer to match that of the buffer size. Closes #70698 Backport of #72412
1 parent 2daa984 commit 78f18c0

File tree

4 files changed

+3
-4
lines changed

4 files changed

+3
-4
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI
475475
"classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean");
476476
}
477477

478-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581")
479478
public void testStopAndRestart() throws Exception {
480479
initialize("classification_stop_and_restart");
481480
String predictedClassField = KEYWORD_FIELD + "_prediction";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public abstract class AbstractControlMsgWriter {
1818
/**
1919
* This should be the same size as the buffer in the C++ native process.
2020
*/
21-
public static final int FLUSH_SPACES_LENGTH = 8192;
21+
public static final int FLUSH_SPACES_LENGTH = 2048;
2222

2323
protected final LengthEncodedWriter lengthEncodedWriter;
2424
private final int numberOfFields;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void testWriteEndOfData() throws IOException {
3939
inOrder.verify(lengthEncodedWriter).writeField("$");
4040

4141
StringBuilder spaces = new StringBuilder();
42-
IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' '));
42+
IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' '));
4343
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
4444
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
4545
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void testWriteFlushMessage() throws IOException {
159159
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
160160
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
161161
StringBuilder spaces = new StringBuilder();
162-
IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' '));
162+
IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' '));
163163
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
164164

165165
inOrder.verify(lengthEncodedWriter).flush();

0 commit comments

Comments
 (0)