Skip to content

Commit 9dd5273

Browse files
[ML] Outlier detection should only fetch docs that have the analyzed … (#44944) (#44959)
As data frame rows with missing values for analyzed fields are skipped, we can be more efficient by including a query that only picks documents that have values for all analyzed fields. Besides improving the number of documents we go through, we also provide a more accurate measurement of how many rows we need which reduces the memory requirements. This also adds an integration test that runs outlier detection on data with missing fields.
1 parent a3cc32d commit 9dd5273

File tree

4 files changed

+137
-42
lines changed

4 files changed

+137
-42
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

+20-17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.action.support.master.AcknowledgedResponse;
9+
import org.elasticsearch.common.Nullable;
910
import org.elasticsearch.common.Strings;
1011
import org.elasticsearch.common.unit.TimeValue;
1112
import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -16,15 +17,16 @@
1617
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
1718
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
1819
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
20+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
21+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
1922
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
23+
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
2024

2125
import java.io.IOException;
2226
import java.util.ArrayList;
23-
import java.util.HashMap;
2427
import java.util.List;
2528
import java.util.Map;
2629
import java.util.concurrent.TimeUnit;
27-
import java.util.function.Function;
2830

2931
import static org.hamcrest.Matchers.equalTo;
3032

@@ -97,22 +99,23 @@ protected List<GetDataFrameAnalyticsStatsAction.Response.Stats> getAnalyticsStat
9799
return response.getResponse().results();
98100
}
99101

100-
protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
101-
Function<Integer, Integer> timeToCountFunction) throws IOException {
102-
List<String> data = new ArrayList<>();
103-
long now = timestamp;
104-
for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) {
105-
for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) {
106-
Map<String, Object> record = new HashMap<>();
107-
record.put("time", now);
108-
data.add(createJsonRecord(record));
109-
}
110-
now += bucketSpan.getMillis();
111-
}
112-
return data;
113-
}
114-
115102
protected static String createJsonRecord(Map<String, Object> keyValueMap) throws IOException {
116103
return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n";
117104
}
105+
106+
protected static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex,
107+
@Nullable String resultsField) {
108+
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id);
109+
configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null));
110+
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
111+
configBuilder.setAnalysis(new OutlierDetection());
112+
return configBuilder.build();
113+
}
114+
115+
protected void assertState(String id, DataFrameAnalyticsState state) {
116+
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
117+
assertThat(stats.size(), equalTo(1));
118+
assertThat(stats.get(0).getId(), equalTo(id));
119+
assertThat(stats.get(0).getState(), equalTo(state));
120+
}
118121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
9+
import org.elasticsearch.action.bulk.BulkResponse;
10+
import org.elasticsearch.action.get.GetResponse;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.search.SearchResponse;
13+
import org.elasticsearch.action.support.WriteRequest;
14+
import org.elasticsearch.search.SearchHit;
15+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
16+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
17+
import org.junit.After;
18+
19+
import java.util.Map;
20+
21+
import static org.hamcrest.Matchers.allOf;
22+
import static org.hamcrest.Matchers.equalTo;
23+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
24+
import static org.hamcrest.Matchers.is;
25+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
26+
27+
public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyticsIntegTestCase {
28+
29+
@After
30+
public void cleanup() {
31+
cleanUp();
32+
}
33+
34+
public void testMissingFields() throws Exception {
35+
String sourceIndex = "test-outlier-detection-with-missing-fields";
36+
37+
client().admin().indices().prepareCreate(sourceIndex)
38+
.addMapping("_doc", "numeric", "type=double", "categorical", "type=keyword")
39+
.get();
40+
41+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
42+
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
43+
44+
// 5 docs with valid numeric value and missing categorical field (which should be ignored as it's not analyzed)
45+
for (int i = 0; i < 5; i++) {
46+
IndexRequest indexRequest = new IndexRequest(sourceIndex);
47+
indexRequest.source("numeric", 42.0);
48+
bulkRequestBuilder.add(indexRequest);
49+
}
50+
51+
// Add a doc with missing field
52+
{
53+
IndexRequest missingIndexRequest = new IndexRequest(sourceIndex);
54+
missingIndexRequest.source("categorical", "foo");
55+
bulkRequestBuilder.add(missingIndexRequest);
56+
}
57+
58+
// Add a doc with numeric being array which is also treated as missing
59+
{
60+
IndexRequest arrayIndexRequest = new IndexRequest(sourceIndex);
61+
arrayIndexRequest.source("numeric", new double[]{1.0, 2.0}, "categorical", "foo");
62+
bulkRequestBuilder.add(arrayIndexRequest);
63+
}
64+
65+
BulkResponse bulkResponse = bulkRequestBuilder.get();
66+
if (bulkResponse.hasFailures()) {
67+
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
68+
}
69+
70+
String id = "test_outlier_detection_with_missing_fields";
71+
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, sourceIndex + "-results", null);
72+
registerAnalytics(config);
73+
putAnalytics(config);
74+
75+
assertState(id, DataFrameAnalyticsState.STOPPED);
76+
77+
startAnalytics(id);
78+
waitUntilAnalyticsIsStopped(id);
79+
80+
SearchResponse sourceData = client().prepareSearch(sourceIndex).get();
81+
for (SearchHit hit : sourceData.getHits()) {
82+
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
83+
assertThat(destDocGetResponse.isExists(), is(true));
84+
Map<String, Object> sourceDoc = hit.getSourceAsMap();
85+
Map<String, Object> destDoc = destDocGetResponse.getSource();
86+
for (String field : sourceDoc.keySet()) {
87+
assertThat(destDoc.containsKey(field), is(true));
88+
assertThat(destDoc.get(field), equalTo(sourceDoc.get(field)));
89+
}
90+
if (destDoc.containsKey("numeric") && destDoc.get("numeric") instanceof Double) {
91+
assertThat(destDoc.containsKey("ml"), is(true));
92+
@SuppressWarnings("unchecked")
93+
Map<String, Object> resultsObject = (Map<String, Object>) destDoc.get("ml");
94+
95+
assertThat(resultsObject.containsKey("outlier_score"), is(true));
96+
double outlierScore = (double) resultsObject.get("outlier_score");
97+
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
98+
} else {
99+
assertThat(destDoc.containsKey("ml"), is(false));
100+
}
101+
}
102+
}
103+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

+2-24
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,14 @@
1313
import org.elasticsearch.action.index.IndexRequest;
1414
import org.elasticsearch.action.search.SearchResponse;
1515
import org.elasticsearch.action.support.WriteRequest;
16-
import org.elasticsearch.common.Nullable;
1716
import org.elasticsearch.common.xcontent.XContentType;
1817
import org.elasticsearch.index.IndexSettings;
1918
import org.elasticsearch.index.query.QueryBuilders;
2019
import org.elasticsearch.search.SearchHit;
21-
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
2220
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
23-
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
24-
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
2521
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
26-
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
2722
import org.junit.After;
2823

29-
import java.util.List;
3024
import java.util.Map;
3125

3226
import static org.hamcrest.Matchers.allOf;
@@ -97,7 +91,7 @@ public void testOutlierDetectionWithFewDocuments() throws Exception {
9791

9892
assertThat(resultsObject.containsKey("outlier_score"), is(true));
9993
double outlierScore = (double) resultsObject.get("outlier_score");
100-
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
94+
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
10195
if (hit.getId().equals("outlier")) {
10296
scoreOfOutlier = outlierScore;
10397
} else {
@@ -218,7 +212,7 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
218212

219213
assertThat(resultsObject.containsKey("outlier_score"), is(true));
220214
double outlierScore = (double) resultsObject.get("outlier_score");
221-
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
215+
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
222216
}
223217
}
224218

@@ -368,20 +362,4 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
368362
.setQuery(QueryBuilders.existsQuery("ml.outlier_score")).get();
369363
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
370364
}
371-
372-
private static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex,
373-
@Nullable String resultsField) {
374-
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id);
375-
configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null));
376-
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
377-
configBuilder.setAnalysis(new OutlierDetection());
378-
return configBuilder.build();
379-
}
380-
381-
private void assertState(String id, DataFrameAnalyticsState state) {
382-
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
383-
assertThat(stats.size(), equalTo(1));
384-
assertThat(stats.get(0).getId(), equalTo(id));
385-
assertThat(stats.get(0).getState(), equalTo(state));
386-
}
387365
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import org.elasticsearch.common.settings.Settings;
2020
import org.elasticsearch.index.IndexNotFoundException;
2121
import org.elasticsearch.index.IndexSettings;
22+
import org.elasticsearch.index.query.BoolQueryBuilder;
23+
import org.elasticsearch.index.query.QueryBuilder;
2224
import org.elasticsearch.index.query.QueryBuilders;
2325
import org.elasticsearch.xpack.core.ClientHelper;
2426
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
27+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
2528
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
2629

2730
import java.util.Arrays;
@@ -52,14 +55,22 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) {
5255
analyticsId,
5356
extractedFields,
5457
Arrays.asList(index),
55-
QueryBuilders.matchAllQuery(),
58+
allExtractedFieldsExistQuery(),
5659
1000,
5760
headers,
5861
includeSource
5962
);
6063
return new DataFrameDataExtractor(client, context);
6164
}
6265

66+
private QueryBuilder allExtractedFieldsExistQuery() {
67+
BoolQueryBuilder query = QueryBuilders.boolQuery();
68+
for (ExtractedField field : extractedFields.getAllFields()) {
69+
query.filter(QueryBuilders.existsQuery(field.getName()));
70+
}
71+
return query;
72+
}
73+
6374
/**
6475
* Validate and create a new extractor factory
6576
*

0 commit comments

Comments
 (0)