Skip to content

[ML] Outlier detection should only fetch docs that have the analyzed … #44944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -16,15 +17,16 @@
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -97,22 +99,23 @@ protected List<GetDataFrameAnalyticsStatsAction.Response.Stats> getAnalyticsStat
return response.getResponse().results();
}

protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
Function<Integer, Integer> timeToCountFunction) throws IOException {
List<String> data = new ArrayList<>();
long now = timestamp;
for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) {
for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) {
Map<String, Object> record = new HashMap<>();
record.put("time", now);
data.add(createJsonRecord(record));
}
now += bucketSpan.getMillis();
}
return data;
}

protected static String createJsonRecord(Map<String, Object> keyValueMap) throws IOException {
return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n";
}

protected static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex,
@Nullable String resultsField) {
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id);
configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null));
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
configBuilder.setAnalysis(new OutlierDetection());
return configBuilder.build();
}

protected void assertState(String id, DataFrameAnalyticsState state) {
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
assertThat(stats.size(), equalTo(1));
assertThat(stats.get(0).getId(), equalTo(id));
assertThat(stats.get(0).getState(), equalTo(state));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.junit.After;

import java.util.Map;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyticsIntegTestCase {

@After
public void cleanup() {
cleanUp();
}

public void testMissingFields() throws Exception {
String sourceIndex = "test-outlier-detection-with-missing-fields";

client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", "numeric", "type=double", "categorical", "type=keyword")
.get();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

// 5 docs with valid value
for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest(sourceIndex);
indexRequest.source("numeric", 42.0, "categorical", "foo");
bulkRequestBuilder.add(indexRequest);
}

// Add a doc with missing field
{
IndexRequest missingIndexRequest = new IndexRequest(sourceIndex);
missingIndexRequest.source("categorical", "foo");
bulkRequestBuilder.add(missingIndexRequest);
}

// Add a doc with numeric being array which is also treated as missing
{
IndexRequest arrayIndexRequest = new IndexRequest(sourceIndex);
arrayIndexRequest.source("numeric", new double[]{1.0, 2.0}, "categorical", "foo");
bulkRequestBuilder.add(arrayIndexRequest);
}

BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}

String id = "test_outlier_detection_with_missing_fields";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, sourceIndex + "-results", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for certainty, you don't need to explicitly set analyzed_fields, because it defaults to all numeric fields?
You could index some docs that have "numeric" but are missing "categorical" to show that missing categorical field doesn't matter and "ml" object is still generated for such docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the categorical field in this case is not included in the analyzed fields. Good idea, I'll do so.

registerAnalytics(config);
putAnalytics(config);

assertState(id, DataFrameAnalyticsState.STOPPED);

startAnalytics(id);
waitUntilAnalyticsIsStopped(id);

SearchResponse sourceData = client().prepareSearch(sourceIndex).get();
for (SearchHit hit : sourceData.getHits()) {
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
assertThat(destDocGetResponse.isExists(), is(true));
Map<String, Object> sourceDoc = hit.getSourceAsMap();
Map<String, Object> destDoc = destDocGetResponse.getSource();
for (String field : sourceDoc.keySet()) {
assertThat(destDoc.containsKey(field), is(true));
assertThat(destDoc.get(field), equalTo(sourceDoc.get(field)));
}
if (destDoc.containsKey("numeric") && destDoc.get("numeric") instanceof Double) {
assertThat(destDoc.containsKey("ml"), is(true));
@SuppressWarnings("unchecked")
Map<String, Object> resultsObject = (Map<String, Object>) destDoc.get("ml");

assertThat(resultsObject.containsKey("outlier_score"), is(true));
double outlierScore = (double) resultsObject.get("outlier_score");
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
} else {
assertThat(destDoc.containsKey("ml"), is(false));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.junit.After;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -97,7 +91,7 @@ public void testOutlierDetectionWithFewDocuments() throws Exception {

assertThat(resultsObject.containsKey("outlier_score"), is(true));
double outlierScore = (double) resultsObject.get("outlier_score");
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
if (hit.getId().equals("outlier")) {
scoreOfOutlier = outlierScore;
} else {
Expand Down Expand Up @@ -218,7 +212,7 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex

assertThat(resultsObject.containsKey("outlier_score"), is(true));
double outlierScore = (double) resultsObject.get("outlier_score");
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
}
}

Expand Down Expand Up @@ -368,20 +362,4 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
.setQuery(QueryBuilders.existsQuery("ml.outlier_score")).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
}

private static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex,
@Nullable String resultsField) {
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id);
configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null));
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
configBuilder.setAnalysis(new OutlierDetection());
return configBuilder.build();
}

private void assertState(String id, DataFrameAnalyticsState state) {
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
assertThat(stats.size(), equalTo(1));
assertThat(stats.get(0).getId(), equalTo(id));
assertThat(stats.get(0).getState(), equalTo(state));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.util.Arrays;
Expand Down Expand Up @@ -52,14 +55,22 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) {
analyticsId,
extractedFields,
Arrays.asList(index),
QueryBuilders.matchAllQuery(),
allExtractedFieldsExistQuery(),
1000,
headers,
includeSource
);
return new DataFrameDataExtractor(client, context);
}

private QueryBuilder allExtractedFieldsExistQuery() {
BoolQueryBuilder query = QueryBuilders.boolQuery();
for (ExtractedField field : extractedFields.getAllFields()) {
query.filter(QueryBuilders.existsQuery(field.getName()));
}
return query;
}

/**
* Validate and create a new extractor factory
*
Expand Down