Skip to content

Commit fd3e6a9

Browse files
[FEATURE][ML] Parse results and join them in the data-frame copy index (#36382)
1 parent 8457b81 commit fd3e6a9

20 files changed

+594
-494
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.xpack.core.ml.action.RunAnalyticsAction;
3939
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4040
import org.elasticsearch.xpack.ml.MachineLearning;
41-
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor;
4241
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractorFactory;
4342
import org.elasticsearch.xpack.ml.analytics.DataFrameFields;
4443
import org.elasticsearch.xpack.ml.analytics.process.AnalyticsProcessManager;
@@ -178,8 +177,7 @@ private void runPipelineAnalytics(String index, ActionListener<AcknowledgedRespo
178177

179178
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
180179
dataExtractorFactory -> {
181-
DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor();
182-
analyticsProcessManager.processData(jobId, dataExtractor);
180+
analyticsProcessManager.runJob(jobId, dataExtractorFactory);
183181
listener.onResponse(new AcknowledgedResponse(true));
184182
},
185183
listener::onFailure

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private SearchRequestBuilder buildSearchRequest() {
9898
.setIndices(context.indices)
9999
.setSize(context.scrollSize)
100100
.setQuery(context.query)
101-
.setFetchSource(false);
101+
.setFetchSource(context.includeSource);
102102

103103
for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) {
104104
searchRequestBuilder.addDocValueField(docValueField.getName(), docValueField.getDocValueFormat());
@@ -149,7 +149,7 @@ private Row createRow(SearchHit hit) {
149149
break;
150150
}
151151
}
152-
return new Row(extractedValues);
152+
return new Row(extractedValues, hit);
153153
}
154154

155155
private List<Row> continueScroll() throws IOException {
@@ -196,10 +196,11 @@ public DataSummary collectDataSummary() {
196196
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
197197
.setIndices(context.indices)
198198
.setSize(0)
199-
.setQuery(context.query);
199+
.setQuery(context.query)
200+
.setTrackTotalHits(true);
200201

201202
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
202-
return new DataSummary(searchResponse.getHits().getTotalHits(), context.extractedFields.getAllFields().size());
203+
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
203204
}
204205

205206
public static class DataSummary {
@@ -215,16 +216,27 @@ public DataSummary(long rows, int cols) {
215216

216217
public static class Row {
217218

219+
private SearchHit hit;
220+
218221
@Nullable
219222
private String[] values;
220223

221-
private Row(String[] values) {
224+
private Row(String[] values, SearchHit hit) {
222225
this.values = values;
226+
this.hit = hit;
223227
}
224228

225229
@Nullable
226230
public String[] getValues() {
227231
return values;
228232
}
233+
234+
public SearchHit getHit() {
235+
return hit;
236+
}
237+
238+
public boolean shouldSkip() {
239+
return values == null;
240+
}
229241
}
230242
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ public class DataFrameDataExtractorContext {
2020
final QueryBuilder query;
2121
final int scrollSize;
2222
final Map<String, String> headers;
23+
final boolean includeSource;
2324

2425
DataFrameDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indices, QueryBuilder query, int scrollSize,
25-
Map<String, String> headers) {
26+
Map<String, String> headers, boolean includeSource) {
2627
this.jobId = Objects.requireNonNull(jobId);
2728
this.extractedFields = Objects.requireNonNull(extractedFields);
2829
this.indices = indices.toArray(new String[indices.size()]);
2930
this.query = Objects.requireNonNull(query);
3031
this.scrollSize = scrollSize;
3132
this.headers = headers;
33+
this.includeSource = includeSource;
3234
}
3335
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,16 @@ private DataFrameDataExtractorFactory(Client client, String index, ExtractedFiel
6363
this.extractedFields = Objects.requireNonNull(extractedFields);
6464
}
6565

66-
public DataFrameDataExtractor newExtractor() {
66+
public DataFrameDataExtractor newExtractor(boolean includeSource) {
6767
DataFrameDataExtractorContext context = new DataFrameDataExtractorContext(
6868
"ml-analytics-" + index,
6969
extractedFields,
7070
Arrays.asList(index),
7171
QueryBuilders.matchAllQuery(),
7272
1000,
73-
Collections.emptyMap());
73+
Collections.emptyMap(),
74+
includeSource
75+
);
7476
return new DataFrameDataExtractor(client, context);
7577
}
7678

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class AnalyticsControlMessageWriter extends AbstractControlMsgWriter {
1818
* but in the context of the java side it is more descriptive to call this the
1919
* end of data message.
2020
*/
21-
private static final String END_OF_DATA_MESSAGE_CODE = "r";
21+
private static final String END_OF_DATA_MESSAGE_CODE = "$";
2222

2323
/**
2424
* Construct the control message writer with a LengthEncodedWriter

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.xpack.ml.process.NativeProcess;
99

1010
import java.io.IOException;
11+
import java.util.Iterator;
1112

1213
public interface AnalyticsProcess extends NativeProcess {
1314

@@ -17,4 +18,17 @@ public interface AnalyticsProcess extends NativeProcess {
1718
* @throws IOException If an error occurs writing to the process
1819
*/
1920
void writeEndOfDataMessage() throws IOException;
21+
22+
/**
23+
* @return stream of analytics results.
24+
*/
25+
Iterator<AnalyticsResult> readAnalyticsResults();
26+
27+
/**
28+
* Read anything left in the stream before
29+
* closing the stream otherwise if the process
30+
* tries to write more after the close it gets
31+
* a SIGPIPE
32+
*/
33+
void consumeAndCloseOutputStream();
2034
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.xpack.ml.MachineLearning;
1818
import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis;
1919
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor;
20+
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractorFactory;
2021

2122
import java.io.IOException;
2223
import java.util.List;
@@ -41,28 +42,39 @@ public AnalyticsProcessManager(Client client, Environment environment, ThreadPoo
4142
this.processFactory = Objects.requireNonNull(analyticsProcessFactory);
4243
}
4344

44-
public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
45+
public void runJob(String jobId, DataFrameDataExtractorFactory dataExtractorFactory) {
4546
threadPool.generic().execute(() -> {
46-
AnalyticsProcess process = createProcess(jobId, dataExtractor);
47-
try {
48-
writeHeaderRecord(dataExtractor, process);
49-
writeDataRows(dataExtractor, process);
50-
process.writeEndOfDataMessage();
51-
process.flushStream();
47+
DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor(false);
48+
AnalyticsProcess process = createProcess(jobId, createProcessConfig(dataExtractor));
49+
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
50+
AnalyticsResultProcessor resultProcessor = new AnalyticsResultProcessor(client, dataExtractorFactory.newExtractor(true));
51+
executorService.execute(() -> resultProcessor.process(process));
52+
executorService.execute(() -> processData(jobId, dataExtractor, process, resultProcessor));
53+
});
54+
}
55+
56+
private void processData(String jobId, DataFrameDataExtractor dataExtractor, AnalyticsProcess process,
57+
AnalyticsResultProcessor resultProcessor) {
58+
try {
59+
writeHeaderRecord(dataExtractor, process);
60+
writeDataRows(dataExtractor, process);
61+
process.writeEndOfDataMessage();
62+
process.flushStream();
5263

53-
LOGGER.debug("[{}] Closing process", jobId);
64+
LOGGER.info("[{}] Waiting for result processor to complete", jobId);
65+
resultProcessor.awaitForCompletion();
66+
LOGGER.info("[{}] Result processor has completed", jobId);
67+
} catch (IOException e) {
68+
LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", jobId), e);
69+
} finally {
70+
LOGGER.info("[{}] Closing process", jobId);
71+
try {
5472
process.close();
5573
LOGGER.info("[{}] Closed process", jobId);
5674
} catch (IOException e) {
57-
LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", jobId), e);
58-
} finally {
59-
try {
60-
process.close();
61-
} catch (IOException e) {
62-
LOGGER.error("[{}] Error closing data frame analyzer process", jobId);
63-
}
75+
LOGGER.error("[{}] Error closing data frame analyzer process", jobId);
6476
}
65-
});
77+
}
6678
}
6779

6880
private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException {
@@ -75,8 +87,8 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces
7587
Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
7688
if (rows.isPresent()) {
7789
for (DataFrameDataExtractor.Row row : rows.get()) {
78-
String[] rowValues = row.getValues();
79-
if (rowValues != null) {
90+
if (row.shouldSkip() == false) {
91+
String[] rowValues = row.getValues();
8092
System.arraycopy(rowValues, 0, record, 0, rowValues.length);
8193
process.writeRecord(record);
8294
}
@@ -96,10 +108,10 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr
96108
process.writeRecord(headerRecord);
97109
}
98110

99-
private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) {
111+
private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) {
100112
// TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME
101113
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
102-
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, createProcessConfig(dataExtractor), executorService);
114+
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService);
103115
if (process.isProcessAlive() == false) {
104116
throw ExceptionsHelper.serverError("Failed to start analytics process");
105117
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.analytics.process;
7+
8+
import org.elasticsearch.common.ParseField;
9+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
10+
import org.elasticsearch.common.xcontent.ToXContentObject;
11+
import org.elasticsearch.common.xcontent.XContentBuilder;
12+
13+
import java.io.IOException;
14+
import java.util.Map;
15+
import java.util.Objects;
16+
17+
public class AnalyticsResult implements ToXContentObject {
18+
19+
public static final ParseField TYPE = new ParseField("analytics_result");
20+
public static final ParseField ID_HASH = new ParseField("id_hash");
21+
public static final ParseField RESULTS = new ParseField("results");
22+
23+
static final ConstructingObjectParser<AnalyticsResult, Void> PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(),
24+
a -> new AnalyticsResult((String) a[0], (Map<String, Object>) a[1]));
25+
26+
static {
27+
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_HASH);
28+
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS);
29+
}
30+
31+
private final String idHash;
32+
private final Map<String, Object> results;
33+
34+
public AnalyticsResult(String idHash, Map<String, Object> results) {
35+
this.idHash = Objects.requireNonNull(idHash);
36+
this.results = Objects.requireNonNull(results);
37+
}
38+
39+
public String getIdHash() {
40+
return idHash;
41+
}
42+
43+
public Map<String, Object> getResults() {
44+
return results;
45+
}
46+
47+
@Override
48+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
49+
builder.startObject();
50+
builder.field(ID_HASH.getPreferredName(), idHash);
51+
builder.field(RESULTS.getPreferredName(), results);
52+
builder.endObject();
53+
return builder;
54+
}
55+
56+
@Override
57+
public boolean equals(Object other) {
58+
if (this == other) {
59+
return true;
60+
}
61+
if (other == null || getClass() != other.getClass()) {
62+
return false;
63+
}
64+
65+
AnalyticsResult that = (AnalyticsResult) other;
66+
return Objects.equals(idHash, that.idHash) && Objects.equals(results, that.results);
67+
}
68+
69+
@Override
70+
public int hashCode() {
71+
return Objects.hash(idHash, results);
72+
}
73+
}

0 commit comments

Comments
 (0)