Skip to content

Commit 88496c2

Browse files
[FEATURE][ML] Write control message to signify end of data (#36158)
1 parent 44491db commit 88496c2

File tree

8 files changed

+151
-25
lines changed

8 files changed

+151
-25
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,6 @@ public List<String> getFieldNames() {
192192
return context.extractedFields.getAllFields().stream().map(ExtractedField::getAlias).collect(Collectors.toList());
193193
}
194194

195-
public String[] getFieldNamesArray() {
196-
List<String> fieldNames = getFieldNames();
197-
return fieldNames.toArray(new String[fieldNames.size()]);
198-
}
199-
200195
public DataSummary collectDataSummary() {
201196
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
202197
.setIndices(context.indices)
@@ -210,9 +205,9 @@ public DataSummary collectDataSummary() {
210205
public static class DataSummary {
211206

212207
public final long rows;
213-
public final long cols;
208+
public final int cols;
214209

215-
public DataSummary(long rows, long cols) {
210+
public DataSummary(long rows, int cols) {
216211
this.rows = rows;
217212
this.cols = cols;
218213
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.analytics.process;
7+
8+
import org.elasticsearch.xpack.ml.process.writer.AbstractControlMsgWriter;
9+
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
10+
11+
import java.io.IOException;
12+
13+
public class AnalyticsControlMessageWriter extends AbstractControlMsgWriter {
14+
15+
/**
16+
* This must match the code defined in the api::CDataFrameAnalyzer C++ class.
17+
* The constant there is referred as RUN_ANALYSIS_CONTROL_MESSAGE_FIELD_VALUE
18+
* but in the context of the java side it is more descriptive to call this the
19+
* end of data message.
20+
*/
21+
private static final String END_OF_DATA_MESSAGE_CODE = "r";
22+
23+
/**
24+
* Construct the control message writer with a LengthEncodedWriter
25+
*
26+
* @param lengthEncodedWriter The writer
27+
* @param numberOfFields The number of fields the process expects in each record
28+
*/
29+
public AnalyticsControlMessageWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) {
30+
super(lengthEncodedWriter, numberOfFields);
31+
}
32+
33+
public void writeEndOfData() throws IOException {
34+
writeMessage(END_OF_DATA_MESSAGE_CODE);
35+
fillCommandBuffer();
36+
lengthEncodedWriter.flush();
37+
}
38+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,14 @@
77

88
import org.elasticsearch.xpack.ml.process.NativeProcess;
99

10+
import java.io.IOException;
11+
1012
public interface AnalyticsProcess extends NativeProcess {
13+
14+
/**
15+
* Writes a control message that informs the process
16+
* all data has been sent
17+
* @throws IOException If an error occurs writing to the process
18+
*/
19+
void writeEndOfDataMessage() throws IOException;
1120
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@ public class AnalyticsProcessConfig implements ToXContentObject {
2222
private static final String ANALYSIS = "analysis";
2323

2424
private final long rows;
25-
private final long cols;
25+
private final int cols;
2626
private final ByteSizeValue memoryLimit;
2727
private final int threads;
2828
private final DataFrameAnalysis analysis;
2929

3030

31-
public AnalyticsProcessConfig(long rows, long cols, ByteSizeValue memoryLimit, int threads, DataFrameAnalysis analysis) {
31+
public AnalyticsProcessConfig(long rows, int cols, ByteSizeValue memoryLimit, int threads, DataFrameAnalysis analysis) {
3232
this.rows = rows;
3333
this.cols = cols;
3434
this.memoryLimit = Objects.requireNonNull(memoryLimit);
3535
this.threads = threads;
3636
this.analysis = Objects.requireNonNull(analysis);
3737
}
3838

39+
public int cols() {
40+
return cols;
41+
}
42+
3943
@Override
4044
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
4145
builder.startObject();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.client.Client;
1112
import org.elasticsearch.common.unit.ByteSizeUnit;
1213
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -44,27 +45,16 @@ public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
4445
threadPool.generic().execute(() -> {
4546
AnalyticsProcess process = createProcess(jobId, dataExtractor);
4647
try {
47-
// Fake header
48-
process.writeRecord(dataExtractor.getFieldNamesArray());
49-
50-
while (dataExtractor.hasNext()) {
51-
Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
52-
if (rows.isPresent()) {
53-
for (DataFrameDataExtractor.Row row : rows.get()) {
54-
String[] rowValues = row.getValues();
55-
if (rowValues != null) {
56-
process.writeRecord(rowValues);
57-
}
58-
}
59-
}
60-
}
48+
writeHeaderRecord(dataExtractor, process);
49+
writeDataRows(dataExtractor, process);
50+
process.writeEndOfDataMessage();
6151
process.flushStream();
6252

6353
LOGGER.debug("[{}] Closing process", jobId);
6454
process.close();
6555
LOGGER.info("[{}] Closed process", jobId);
6656
} catch (IOException e) {
67-
57+
LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", jobId), e);
6858
} finally {
6959
try {
7060
process.close();
@@ -75,6 +65,37 @@ public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
7565
});
7666
}
7767

68+
private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException {
69+
// The extra field is the control field (should be an empty string)
70+
String[] record = new String[dataExtractor.getFieldNames().size() + 1];
71+
// The value of the control field should be an empty string for data frame rows
72+
record[record.length - 1] = "";
73+
74+
while (dataExtractor.hasNext()) {
75+
Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
76+
if (rows.isPresent()) {
77+
for (DataFrameDataExtractor.Row row : rows.get()) {
78+
String[] rowValues = row.getValues();
79+
if (rowValues != null) {
80+
System.arraycopy(rowValues, 0, record, 0, rowValues.length);
81+
process.writeRecord(record);
82+
}
83+
}
84+
}
85+
}
86+
}
87+
88+
private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException {
89+
List<String> fieldNames = dataExtractor.getFieldNames();
90+
String[] headerRecord = new String[fieldNames.size() + 1];
91+
for (int i = 0; i < fieldNames.size(); i++) {
92+
headerRecord[i] = fieldNames.get(i);
93+
}
94+
// The field name of the control field is dot
95+
headerRecord[headerRecord.length - 1] = ".";
96+
process.writeRecord(headerRecord);
97+
}
98+
7899
private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) {
79100
// TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME
80101
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcess.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
99

10+
import java.io.IOException;
1011
import java.io.InputStream;
1112
import java.io.OutputStream;
1213
import java.nio.file.Path;
@@ -31,4 +32,9 @@ public String getName() {
3132
public void persistState() {
3233
// Nothing to persist
3334
}
35+
36+
@Override
37+
public void writeEndOfDataMessage() throws IOException {
38+
new AnalyticsControlMessageWriter(recordWriter(), numberOfFields()).writeEndOfData();
39+
}
3440
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,13 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessCon
4545
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
4646
true, false, true, true, false, false);
4747

48+
// The extra 1 is the control field
49+
int numberOfFields = analyticsProcessConfig.cols() + 1;
50+
4851
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
4952

5053
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
51-
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0,
54+
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields,
5255
filesToDelete, () -> {});
5356

5457

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.analytics.process;
7+
8+
import org.elasticsearch.test.ESTestCase;
9+
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
10+
import org.junit.Before;
11+
import org.mockito.InOrder;
12+
import org.mockito.Mockito;
13+
14+
import java.io.IOException;
15+
import java.util.stream.IntStream;
16+
17+
import static org.mockito.Mockito.inOrder;
18+
import static org.mockito.Mockito.times;
19+
import static org.mockito.Mockito.verifyNoMoreInteractions;
20+
21+
public class AnalyticsControlMessageWriterTests extends ESTestCase {
22+
23+
private LengthEncodedWriter lengthEncodedWriter;
24+
25+
@Before
26+
public void setUpMocks() {
27+
lengthEncodedWriter = Mockito.mock(LengthEncodedWriter.class);
28+
}
29+
30+
public void testWriteEndOfData() throws IOException {
31+
AnalyticsControlMessageWriter writer = new AnalyticsControlMessageWriter(lengthEncodedWriter, 4);
32+
33+
writer.writeEndOfData();
34+
35+
InOrder inOrder = inOrder(lengthEncodedWriter);
36+
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
37+
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
38+
inOrder.verify(lengthEncodedWriter).writeField("r");
39+
40+
StringBuilder spaces = new StringBuilder();
41+
IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' '));
42+
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
43+
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
44+
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
45+
46+
inOrder.verify(lengthEncodedWriter).flush();
47+
48+
verifyNoMoreInteractions(lengthEncodedWriter);
49+
}
50+
}

0 commit comments

Comments
 (0)