diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 1f3899939938e..7d29cbc345789 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; @@ -13,6 +15,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; @@ -147,6 +151,71 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception { assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) docCount)); } + public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Exception { + String sourceIndex = "test-outlier-detection-with-more-fields-than-docvalue-limit"; + + client().admin().indices().prepareCreate(sourceIndex).get(); + + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices(sourceIndex); + getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey()); + getSettingsRequest.includeDefaults(true); + + GetSettingsResponse docValueLimitSetting = client().admin().indices().getSettings(getSettingsRequest).actionGet(); + int docValueLimit = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get( + docValueLimitSetting.getIndexToSettings().values().iterator().next().value); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 100; i++) { + + StringBuilder source = new StringBuilder("{"); + for (int fieldCount = 0; fieldCount < docValueLimit + 1; fieldCount++) { + source.append("\"field_").append(fieldCount).append("\":").append(randomDouble()); + if (fieldCount < docValueLimit) { + source.append(","); + } + } + source.append("}"); + + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source(source.toString(), XContentType.JSON); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_outlier_detection_with_more_fields_than_docvalue_limit"; + DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, null); + registerAnalytics(config); + putAnalytics(config); + + assertState(id, DataFrameAnalyticsState.STOPPED); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + SearchResponse sourceData = client().prepareSearch(sourceIndex).get(); + for (SearchHit hit : sourceData.getHits()) { + GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get(); + assertThat(destDocGetResponse.isExists(), is(true)); + Map sourceDoc = hit.getSourceAsMap(); + Map destDoc = destDocGetResponse.getSource(); + for (String field : sourceDoc.keySet()) { + assertThat(destDoc.containsKey(field), is(true)); + assertThat(destDoc.get(field), equalTo(sourceDoc.get(field))); + } + assertThat(destDoc.containsKey("ml"), is(true)); + Map resultsObject = (Map) destDoc.get("ml"); + assertThat(resultsObject.containsKey("outlier_score"), is(true)); + double outlierScore = (double) resultsObject.get("outlier_score"); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0))); + } + } + public void testStopOutlierDetectionWithEnoughDocumentsToScroll() { String sourceIndex = "test-outlier-detection-with-enough-docs-to-scroll"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java index 49642aaeb23f7..afd53ed258426 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java @@ -58,6 +58,8 @@ public ExtractionMethod getExtractionMethod() { public abstract Object[] value(SearchHit hit); + public abstract boolean supportsFromSource(); + public String getDocValueFormat() { return null; } @@ -93,6 +95,14 @@ public static ExtractedField newField(String alias, String name, ExtractionMetho } } + public ExtractedField newFromSource() { + if (supportsFromSource()) { + return new FromSource(alias, name); + } + throw new IllegalStateException("Field (alias [" + alias + "], name [" + name + "]) should be extracted via [" + + extractionMethod + "] and cannot be extracted from source"); + } + private static class FromFields extends ExtractedField { FromFields(String alias, String name, ExtractionMethod extractionMethod) { @@ -108,6 +118,11 @@ public Object[] value(SearchHit hit) { } return new Object[0]; } + + @Override + public boolean supportsFromSource() { + return getExtractionMethod() == ExtractionMethod.DOC_VALUE; + } } private static class GeoShapeField extends FromSource { @@ -195,6 +210,11 @@ private String handleString(String geoString) { throw new IllegalArgumentException("Unexpected value for a geo_point field: " + geoString); } } + + @Override + public boolean supportsFromSource() { + return false; + } } private static class TimeField extends FromFields { @@ -223,6 +243,11 @@ public Object[] value(SearchHit hit) { public String getDocValueFormat() { return EPOCH_MILLIS_FORMAT; } + + @Override + public boolean supportsFromSource() { + return false; + } } private static class FromSource extends ExtractedField { @@ -257,6 +282,11 @@ public Object[] value(SearchHit hit) { return new Object[0]; } + @Override + public boolean supportsFromSource() { + return true; + } + @SuppressWarnings("unchecked") private static Map getNextLevel(Map source, String key) { Object nextLevel = source.get(key); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 7b8452f635f9e..59cd78b4cc6fa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; @@ -128,8 +129,8 @@ private SearchRequestBuilder buildSearchRequest() { .addSort(DataFrameAnalyticsFields.ID, SortOrder.ASC) .setIndices(context.indices) .setSize(context.scrollSize) - .setQuery(context.query) - .setFetchSource(context.includeSource); + .setQuery(context.query); + setFetchSource(searchRequestBuilder); for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) { searchRequestBuilder.addDocValueField(docValueField.getName(), docValueField.getDocValueFormat()); @@ -138,6 +139,20 @@ private SearchRequestBuilder buildSearchRequest() { return searchRequestBuilder; } + private void setFetchSource(SearchRequestBuilder searchRequestBuilder) { + if (context.includeSource) { + searchRequestBuilder.setFetchSource(true); + } else { + String[] sourceFields = context.extractedFields.getSourceFields(); + if (sourceFields.length == 0) { + searchRequestBuilder.setFetchSource(false); + searchRequestBuilder.storedFields(StoredFieldsContext._NONE_); + } else { + searchRequestBuilder.setFetchSource(sourceFields, null); + } + } + } + private List processSearchResponse(SearchResponse searchResponse) throws IOException { scrollId = searchResponse.getScrollId(); if (searchResponse.getHits().getHits().length == 0) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index f7fc0faf0b011..baf77c420c5cb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -5,21 +5,29 @@ */ package org.elasticsearch.xpack.ml.dataframe.extractor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; import java.util.Arrays; +import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class DataFrameDataExtractorFactory { @@ -96,29 +104,65 @@ private static void validateIndexAndExtractFields(Client client, DataFrameAnalyticsConfig config, boolean isTaskRestarting, ActionListener listener) { - // Step 2. Extract fields (if possible) and notify listener + AtomicInteger docValueFieldsLimitHolder = new AtomicInteger(); + + // Step 3. Extract fields (if possible) and notify listener ActionListener fieldCapabilitiesHandler = ActionListener.wrap( - fieldCapabilitiesResponse -> listener.onResponse( - new ExtractedFieldsDetector(index, config, isTaskRestarting, fieldCapabilitiesResponse).detect()), + fieldCapabilitiesResponse -> listener.onResponse(new ExtractedFieldsDetector(index, config, isTaskRestarting, + docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse).detect()), + listener::onFailure + ); + + // Step 2. Get field capabilities necessary to build the information of how to extract fields + ActionListener docValueFieldsLimitListener = ActionListener.wrap( + docValueFieldsLimit -> { + docValueFieldsLimitHolder.set(docValueFieldsLimit); + + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); + fieldCapabilitiesRequest.indices(index); + fieldCapabilitiesRequest.fields("*"); + ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { + client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); + // This response gets discarded - the listener handles the real response + return null; + }); + }, + listener::onFailure + ); + + // Step 1. Get doc value fields limit + getDocValueFieldsLimit(client, index, docValueFieldsLimitListener); + } + + private static void getDocValueFieldsLimit(Client client, String index, ActionListener docValueFieldsLimitListener) { + ActionListener settingsListener = ActionListener.wrap(getSettingsResponse -> { + Integer minDocValueFieldsLimit = Integer.MAX_VALUE; + + ImmutableOpenMap indexToSettings = getSettingsResponse.getIndexToSettings(); + Iterator> iterator = indexToSettings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor indexSettings = iterator.next(); + Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value); + if (indexMaxDocValueFields < minDocValueFieldsLimit) { + minDocValueFieldsLimit = indexMaxDocValueFields; + } + } + docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit); + }, e -> { if (e instanceof IndexNotFoundException) { - listener.onFailure(new ResourceNotFoundException("cannot retrieve data because index " + docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist")); } else { - listener.onFailure(e); + docValueFieldsLimitListener.onFailure(e); } } ); - // Step 1. Get field capabilities necessary to build the information of how to extract fields - FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); - fieldCapabilitiesRequest.indices(index); - fieldCapabilitiesRequest.fields("*"); - ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { - client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); - // This response gets discarded - the listener handles the real response - return null; - }); + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices(index); + getSettingsRequest.includeDefaults(true); + getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey()); + client.admin().indices().getSettings(getSettingsRequest, settingsListener); } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java index 96f0181b1416c..b36fc6f182a06 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -57,13 +58,15 @@ public class ExtractedFieldsDetector { private final String index; private final DataFrameAnalyticsConfig config; private final boolean isTaskRestarting; + private final int docValueFieldsLimit; private final FieldCapabilitiesResponse fieldCapabilitiesResponse; - ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, + ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) { this.index = Objects.requireNonNull(index); this.config = Objects.requireNonNull(config); this.isTaskRestarting = isTaskRestarting; + this.docValueFieldsLimit = docValueFieldsLimit; this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse); } @@ -86,6 +89,14 @@ public ExtractedFields detect() { if (extractedFields.getAllFields().isEmpty()) { throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index); } + if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) { + extractedFields = fetchFromSourceIfSupported(extractedFields); + if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) { + throw ExceptionsHelper.badRequestException("[{}] fields must be retrieved from doc_values but the limit is [{}]; " + + "please adjust the index level setting [{}]", extractedFields.getDocValueFields().size(), docValueFieldsLimit, + IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey()); + } + } return extractedFields; } @@ -141,4 +152,11 @@ private void includeAndExcludeFields(Set fields, String index) { } } + private ExtractedFields fetchFromSourceIfSupported(ExtractedFields extractedFields) { + List adjusted = new ArrayList<>(extractedFields.getAllFields().size()); + for (ExtractedField field : extractedFields.getDocValueFields()) { + adjusted.add(field.supportsFromSource() ? field.newFromSource() : field); + } + return new ExtractedFields(adjusted); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java index f6547e1e6e583..6b0e88d759b81 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; -import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; @@ -26,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -33,7 +33,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -262,6 +261,58 @@ public void testErrorOnContinueScrollTwiceLeadsToFailure() throws IOException { expectThrows(RuntimeException.class, () -> dataExtractor.next()); } + public void testIncludeSourceIsFalseAndNoSourceFields() throws IOException { + TestExtractor dataExtractor = createExtractor(false); + + SearchResponse response = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1)); + dataExtractor.setNextResponse(response); + dataExtractor.setNextResponse(createEmptySearchResponse()); + + assertThat(dataExtractor.hasNext(), is(true)); + + Optional> rows = dataExtractor.next(); + assertThat(rows.isPresent(), is(true)); + assertThat(rows.get().size(), equalTo(1)); + assertThat(rows.get().get(0).getValues(), equalTo(new String[] {"11", "21"})); + assertThat(dataExtractor.hasNext(), is(true)); + + assertThat(dataExtractor.next().isEmpty(), is(true)); + assertThat(dataExtractor.hasNext(), is(false)); + + assertThat(dataExtractor.capturedSearchRequests.size(), equalTo(1)); + String searchRequest = dataExtractor.capturedSearchRequests.get(0).request().toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"docvalue_fields\":[{\"field\":\"field_1\"},{\"field\":\"field_2\"}]")); + assertThat(searchRequest, containsString("\"_source\":false")); + } + + public void testIncludeSourceIsFalseAndAtLeastOneSourceField() throws IOException { + extractedFields = new ExtractedFields(Arrays.asList( + ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE), + ExtractedField.newField("field_2", ExtractedField.ExtractionMethod.SOURCE))); + + TestExtractor dataExtractor = createExtractor(false); + + SearchResponse response = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1)); + dataExtractor.setNextResponse(response); + dataExtractor.setNextResponse(createEmptySearchResponse()); + + assertThat(dataExtractor.hasNext(), is(true)); + + Optional> rows = dataExtractor.next(); + assertThat(rows.isPresent(), is(true)); + assertThat(rows.get().size(), equalTo(1)); + assertThat(rows.get().get(0).getValues(), equalTo(new String[] {"11", "21"})); + assertThat(dataExtractor.hasNext(), is(true)); + + assertThat(dataExtractor.next().isEmpty(), is(true)); + assertThat(dataExtractor.hasNext(), is(false)); + + assertThat(dataExtractor.capturedSearchRequests.size(), equalTo(1)); + String searchRequest = dataExtractor.capturedSearchRequests.get(0).request().toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"docvalue_fields\":[{\"field\":\"field_1\"}]")); + assertThat(searchRequest, containsString("\"_source\":{\"includes\":[\"field_2\"],\"excludes\":[]}")); + } + private TestExtractor createExtractor(boolean includeSource) { DataFrameDataExtractorContext context = new DataFrameDataExtractorContext( JOB_ID, extractedFields, indices, query, scrollSize, headers, includeSource); @@ -275,11 +326,11 @@ private SearchResponse createSearchResponse(List field1Values, List hits = new ArrayList<>(); for (int i = 0; i < field1Values.size(); i++) { SearchHit hit = new SearchHit(randomInt()); - Map fields = new HashMap<>(); - fields.put("field_1", new DocumentField("field_1", Collections.singletonList(field1Values.get(i)))); - fields.put("field_2", new DocumentField("field_2", Collections.singletonList(field2Values.get(i)))); - hit.fields(fields); - hits.add(hit); + SearchHitBuilder searchHitBuilder = new SearchHitBuilder(randomInt()) + .addField("field_1", Collections.singletonList(field1Values.get(i))) + .addField("field_2", Collections.singletonList(field2Values.get(i))) + .setSource("{\"field_1\":" + field1Values.get(i) + ",\"field_2\":" + field2Values.get(i) + "}"); + hits.add(searchHitBuilder.build()); } SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[0]), new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1); when(searchResponse.getHits()).thenReturn(searchHits); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 3aa6bfd6480d1..c035c44f117f4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -41,12 +42,13 @@ public void testDetect_GivenFloatField() { .addAggregatableField("some_float", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(1)); assertThat(allFields.get(0).getName(), equalTo("some_float")); + assertThat(allFields.get(0).getExtractionMethod(), equalTo(ExtractedField.ExtractionMethod.DOC_VALUE)); } public void testDetect_GivenNumericFieldWithMultipleTypes() { @@ -55,12 +57,13 @@ public void testDetect_GivenNumericFieldWithMultipleTypes() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(1)); assertThat(allFields.get(0).getName(), equalTo("some_number")); + assertThat(allFields.get(0).getExtractionMethod(), equalTo(ExtractedField.ExtractionMethod.DOC_VALUE)); } public void testDetect_GivenNonNumericField() { @@ -68,7 +71,7 @@ public void testDetect_GivenNonNumericField() { .addAggregatableField("some_keyword", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -79,7 +82,7 @@ public void testDetect_GivenFieldWithNumericAndNonNumericTypes() { .addAggregatableField("indecisive_field", "float", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -93,13 +96,15 @@ public void testDetect_GivenMultipleFields() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(2)); assertThat(allFields.stream().map(ExtractedField::getName).collect(Collectors.toSet()), containsInAnyOrder("some_float", "some_long")); + assertThat(allFields.stream().map(ExtractedField::getExtractionMethod).collect(Collectors.toSet()), + contains(equalTo(ExtractedField.ExtractionMethod.DOC_VALUE))); } public void testDetect_GivenIgnoredField() { @@ -107,7 +112,7 @@ public void testDetect_GivenIgnoredField() { .addAggregatableField("_id", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -129,7 +134,7 @@ public void testDetect_ShouldSortFieldsAlphabetically() { FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -146,7 +151,7 @@ public void testDetectedExtractedFields_GivenIncludeWithMissingField() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index] with name [your_field1]")); @@ -161,7 +166,7 @@ public void testDetectedExtractedFields_GivenExcludeAllValidFields() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } @@ -177,7 +182,7 @@ public void testDetectedExtractedFields_GivenInclusionsAndExclusions() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -194,7 +199,7 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsField() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("Index [source_index] already has a field that matches the dest.results_field [ml]; " + @@ -210,7 +215,7 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsFieldAndTaskIsR .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildAnalyticsConfig(), true, fieldCapabilities); + SOURCE_INDEX, buildAnalyticsConfig(), true, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -218,6 +223,63 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsFieldAndTaskIsR assertThat(extractedFieldNames, equalTo(Arrays.asList("my_field1", "your_field2"))); } + public void testDetectedExtractedFields_GivenLessFieldsThanDocValuesLimit() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("field_1", "float") + .addAggregatableField("field_2", "float") + .addAggregatableField("field_3", "float") + .addAggregatableField("a_keyword", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), true, 4, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(Arrays.asList("field_1", "field_2", "field_3"))); + assertThat(extractedFields.getAllFields().stream().map(ExtractedField::getExtractionMethod).collect(Collectors.toSet()), + contains(equalTo(ExtractedField.ExtractionMethod.DOC_VALUE))); + } + + public void testDetectedExtractedFields_GivenEqualFieldsToDocValuesLimit() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("field_1", "float") + .addAggregatableField("field_2", "float") + .addAggregatableField("field_3", "float") + .addAggregatableField("a_keyword", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), true, 3, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(Arrays.asList("field_1", "field_2", "field_3"))); + assertThat(extractedFields.getAllFields().stream().map(ExtractedField::getExtractionMethod).collect(Collectors.toSet()), + contains(equalTo(ExtractedField.ExtractionMethod.DOC_VALUE))); + } + + public void testDetectedExtractedFields_GivenMoreFieldsThanDocValuesLimit() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("field_1", "float") + .addAggregatableField("field_2", "float") + .addAggregatableField("field_3", "float") + .addAggregatableField("a_keyword", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), true, 2, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(Arrays.asList("field_1", "field_2", "field_3"))); + assertThat(extractedFields.getAllFields().stream().map(ExtractedField::getExtractionMethod).collect(Collectors.toSet()), + contains(equalTo(ExtractedField.ExtractionMethod.SOURCE))); + } + private static DataFrameAnalyticsConfig buildAnalyticsConfig() { return buildAnalyticsConfig(null); }