Skip to content

[FEATURE][ML] Fetch from source when fields are more then docvalue limit #43204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
Expand Down Expand Up @@ -147,6 +151,71 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) docCount));
}

public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Exception {
String sourceIndex = "test-outlier-detection-with-more-fields-than-docvalue-limit";

client().admin().indices().prepareCreate(sourceIndex).get();

GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
getSettingsRequest.indices(sourceIndex);
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
getSettingsRequest.includeDefaults(true);

GetSettingsResponse docValueLimitSetting = client().admin().indices().getSettings(getSettingsRequest).actionGet();
int docValueLimit = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(
docValueLimitSetting.getIndexToSettings().values().iterator().next().value);

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (int i = 0; i < 100; i++) {

StringBuilder source = new StringBuilder("{");
for (int fieldCount = 0; fieldCount < docValueLimit + 1; fieldCount++) {
source.append("\"field_").append(fieldCount).append("\":").append(randomDouble());
if (fieldCount < docValueLimit) {
source.append(",");
}
}
source.append("}");

IndexRequest indexRequest = new IndexRequest(sourceIndex);
indexRequest.source(source.toString(), XContentType.JSON);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}

String id = "test_outlier_detection_with_more_fields_than_docvalue_limit";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, null);
registerAnalytics(config);
putAnalytics(config);

assertState(id, DataFrameAnalyticsState.STOPPED);

startAnalytics(id);
waitUntilAnalyticsIsStopped(id);

SearchResponse sourceData = client().prepareSearch(sourceIndex).get();
for (SearchHit hit : sourceData.getHits()) {
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
assertThat(destDocGetResponse.isExists(), is(true));
Map<String, Object> sourceDoc = hit.getSourceAsMap();
Map<String, Object> destDoc = destDocGetResponse.getSource();
for (String field : sourceDoc.keySet()) {
assertThat(destDoc.containsKey(field), is(true));
assertThat(destDoc.get(field), equalTo(sourceDoc.get(field)));
}
assertThat(destDoc.containsKey("ml"), is(true));
Map<String, Object> resultsObject = (Map<String, Object>) destDoc.get("ml");
assertThat(resultsObject.containsKey("outlier_score"), is(true));
double outlierScore = (double) resultsObject.get("outlier_score");
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
}
}

public void testStopOutlierDetectionWithEnoughDocumentsToScroll() {
String sourceIndex = "test-outlier-detection-with-enough-docs-to-scroll";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public ExtractionMethod getExtractionMethod() {

public abstract Object[] value(SearchHit hit);

public abstract boolean supportsFromSource();

public String getDocValueFormat() {
return null;
}
Expand Down Expand Up @@ -93,6 +95,14 @@ public static ExtractedField newField(String alias, String name, ExtractionMetho
}
}

public ExtractedField newFromSource() {
if (supportsFromSource()) {
return new FromSource(alias, name);
}
throw new IllegalStateException("Field (alias [" + alias + "], name [" + name + "]) should be extracted via ["
+ extractionMethod + "] and cannot be extracted from source");
}

private static class FromFields extends ExtractedField {

FromFields(String alias, String name, ExtractionMethod extractionMethod) {
Expand All @@ -108,6 +118,11 @@ public Object[] value(SearchHit hit) {
}
return new Object[0];
}

@Override
public boolean supportsFromSource() {
return getExtractionMethod() == ExtractionMethod.DOC_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we are assuming that anything that is FromFields that has ExtractionMethod.DOC_VALUE also supports extraction via _source? This seems OK to me for the most part, except for GeoPointField, which may need to override supportsFromSource() to always return false.

I think we may want to do something similar with TimeField

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will adjust.

}
}

private static class GeoShapeField extends FromSource {
Expand Down Expand Up @@ -195,6 +210,11 @@ private String handleString(String geoString) {
throw new IllegalArgumentException("Unexpected value for a geo_point field: " + geoString);
}
}

@Override
public boolean supportsFromSource() {
return false;
}
}

private static class TimeField extends FromFields {
Expand Down Expand Up @@ -223,6 +243,11 @@ public Object[] value(SearchHit hit) {
public String getDocValueFormat() {
return EPOCH_MILLIS_FORMAT;
}

@Override
public boolean supportsFromSource() {
return false;
}
}

private static class FromSource extends ExtractedField {
Expand Down Expand Up @@ -257,6 +282,11 @@ public Object[] value(SearchHit hit) {
return new Object[0];
}

@Override
public boolean supportsFromSource() {
return true;
}

@SuppressWarnings("unchecked")
private static Map<String, Object> getNextLevel(Map<String, Object> source, String key) {
Object nextLevel = source.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.StoredFieldsContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
Expand Down Expand Up @@ -128,8 +129,8 @@ private SearchRequestBuilder buildSearchRequest() {
.addSort(DataFrameAnalyticsFields.ID, SortOrder.ASC)
.setIndices(context.indices)
.setSize(context.scrollSize)
.setQuery(context.query)
.setFetchSource(context.includeSource);
.setQuery(context.query);
setFetchSource(searchRequestBuilder);

for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) {
searchRequestBuilder.addDocValueField(docValueField.getName(), docValueField.getDocValueFormat());
Expand All @@ -138,6 +139,20 @@ private SearchRequestBuilder buildSearchRequest() {
return searchRequestBuilder;
}

private void setFetchSource(SearchRequestBuilder searchRequestBuilder) {
if (context.includeSource) {
searchRequestBuilder.setFetchSource(true);
} else {
String[] sourceFields = context.extractedFields.getSourceFields();
if (sourceFields.length == 0) {
searchRequestBuilder.setFetchSource(false);
searchRequestBuilder.storedFields(StoredFieldsContext._NONE_);
} else {
searchRequestBuilder.setFetchSource(sourceFields, null);
}
}
}

private List<Row> processSearchResponse(SearchResponse searchResponse) throws IOException {
scrollId = searchResponse.getScrollId();
if (searchResponse.getHits().getHits().length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@
*/
package org.elasticsearch.xpack.ml.dataframe.extractor;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class DataFrameDataExtractorFactory {

Expand Down Expand Up @@ -96,29 +104,65 @@ private static void validateIndexAndExtractFields(Client client,
DataFrameAnalyticsConfig config,
boolean isTaskRestarting,
ActionListener<ExtractedFields> listener) {
// Step 2. Extract fields (if possible) and notify listener
AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();

// Step 3. Extract fields (if possible) and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> listener.onResponse(
new ExtractedFieldsDetector(index, config, isTaskRestarting, fieldCapabilitiesResponse).detect()),
fieldCapabilitiesResponse -> listener.onResponse(new ExtractedFieldsDetector(index, config, isTaskRestarting,
docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse).detect()),
listener::onFailure
);

// Step 2. Get field capabilities necessary to build the information of how to extract fields
ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
docValueFieldsLimit -> {
docValueFieldsLimitHolder.set(docValueFieldsLimit);

FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(index);
fieldCapabilitiesRequest.fields("*");
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
// This response gets discarded - the listener handles the real response
return null;
});
},
listener::onFailure
);

// Step 1. Get doc value fields limit
getDocValueFieldsLimit(client, index, docValueFieldsLimitListener);
}

private static void getDocValueFieldsLimit(Client client, String index, ActionListener<Integer> docValueFieldsLimitListener) {
ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
Integer minDocValueFieldsLimit = Integer.MAX_VALUE;

ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
while (iterator.hasNext()) {
ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
if (indexMaxDocValueFields < minDocValueFieldsLimit) {
minDocValueFieldsLimit = indexMaxDocValueFields;
}
}
docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
+ ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else {
listener.onFailure(e);
docValueFieldsLimitListener.onFailure(e);
}
}
);

// Step 1. Get field capabilities necessary to build the information of how to extract fields
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(index);
fieldCapabilitiesRequest.fields("*");
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
// This response gets discarded - the listener handles the real response
return null;
});
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
getSettingsRequest.indices(index);
getSettingsRequest.includeDefaults(true);
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
client.admin().indices().getSettings(getSettingsRequest, settingsListener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -57,13 +58,15 @@ public class ExtractedFieldsDetector {
private final String index;
private final DataFrameAnalyticsConfig config;
private final boolean isTaskRestarting;
private final int docValueFieldsLimit;
private final FieldCapabilitiesResponse fieldCapabilitiesResponse;

ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit,
FieldCapabilitiesResponse fieldCapabilitiesResponse) {
this.index = Objects.requireNonNull(index);
this.config = Objects.requireNonNull(config);
this.isTaskRestarting = isTaskRestarting;
this.docValueFieldsLimit = docValueFieldsLimit;
this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse);
}

Expand All @@ -86,6 +89,14 @@ public ExtractedFields detect() {
if (extractedFields.getAllFields().isEmpty()) {
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index);
}
if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) {
extractedFields = fetchFromSourceIfSupported(extractedFields);
if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) {
throw ExceptionsHelper.badRequestException("[{}] fields must be retrieved from doc_values but the limit is [{}]; " +
"please adjust the index level setting [{}]", extractedFields.getDocValueFields().size(), docValueFieldsLimit,
IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
}
}
return extractedFields;
}

Expand Down Expand Up @@ -141,4 +152,11 @@ private void includeAndExcludeFields(Set<String> fields, String index) {
}
}

private ExtractedFields fetchFromSourceIfSupported(ExtractedFields extractedFields) {
List<ExtractedField> adjusted = new ArrayList<>(extractedFields.getAllFields().size());
for (ExtractedField field : extractedFields.getDocValueFields()) {
adjusted.add(field.supportsFromSource() ? field.newFromSource() : field);
}
return new ExtractedFields(adjusted);
}
}
Loading