Skip to content

Commit 9b3e32f

Browse files
[FEATURE][ML] Only write numeric fields to data frame
1 parent 3f49eef commit 9b3e32f

File tree

3 files changed

+147
-2
lines changed

3 files changed

+147
-2
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ private void runPipelineAnalytics(String index, ActionListener<AcknowledgedRespo
185185
listener::onFailure
186186
);
187187

188+
// TODO This could fail with errors. In that case we get stuck with the copied index.
189+
// We could delete the index in case of failure or we could try building the factory before reindexing
190+
// to catch the error early on.
188191
DataFrameDataExtractorFactory.create(client, Collections.emptyMap(), index, dataExtractorFactoryListener);
189192
}
190193
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,23 @@
77

88
import org.elasticsearch.ResourceNotFoundException;
99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
1011
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
1112
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1213
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
1314
import org.elasticsearch.client.Client;
1415
import org.elasticsearch.index.IndexNotFoundException;
1516
import org.elasticsearch.index.query.QueryBuilders;
1617
import org.elasticsearch.xpack.core.ClientHelper;
18+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1719
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
1820
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
1921

2022
import java.util.ArrayList;
2123
import java.util.Arrays;
2224
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.Iterator;
2327
import java.util.List;
2428
import java.util.Map;
2529
import java.util.Objects;
@@ -33,6 +37,12 @@ public class DataFrameDataExtractorFactory {
3337
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
3438
"_source", "_type", "_uid", "_version", "_feature", "_ignored");
3539

40+
/**
41+
* The types supported by data frames
42+
*/
43+
private static final Set<String> COMPATIBLE_FIELD_TYPES = new HashSet<>(Arrays.asList(
44+
"long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"));
45+
3646
private final Client client;
3747
private final String index;
3848
private final ExtractedFields extractedFields;
@@ -82,10 +92,27 @@ public static void create(Client client, Map<String, String> headers, String ind
8292
});
8393
}
8494

85-
private static ExtractedFields detectExtractedFields(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
95+
// Visible for testing
96+
static ExtractedFields detectExtractedFields(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
8697
Set<String> fields = fieldCapabilitiesResponse.get().keySet();
8798
fields.removeAll(IGNORE_FIELDS);
88-
return ExtractedFields.build(new ArrayList<>(fields), Collections.emptySet(), fieldCapabilitiesResponse)
99+
removeFieldsWithIncompatibleTypes(fields, fieldCapabilitiesResponse);
100+
ExtractedFields extractedFields = ExtractedFields.build(new ArrayList<>(fields), Collections.emptySet(), fieldCapabilitiesResponse)
89101
.filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
102+
if (extractedFields.getAllFields().isEmpty()) {
103+
throw ExceptionsHelper.badRequestException("No compatible fields could be detected");
104+
}
105+
return extractedFields;
106+
}
107+
108+
private static void removeFieldsWithIncompatibleTypes(Set<String> fields, FieldCapabilitiesResponse fieldCapabilitiesResponse) {
109+
Iterator<String> fieldsIterator = fields.iterator();
110+
while (fieldsIterator.hasNext()) {
111+
String field = fieldsIterator.next();
112+
Map<String, FieldCapabilities> fieldCaps = fieldCapabilitiesResponse.getField(field);
113+
if (fieldCaps == null || COMPATIBLE_FIELD_TYPES.containsAll(fieldCaps.keySet()) == false) {
114+
fieldsIterator.remove();
115+
}
116+
}
90117
}
91118
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.analytics;
7+
8+
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
10+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
11+
import org.elasticsearch.test.ESTestCase;
12+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
13+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
14+
15+
import java.util.HashMap;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
import static org.hamcrest.Matchers.containsInAnyOrder;
21+
import static org.hamcrest.Matchers.equalTo;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
public class DataFrameDataExtractorFactoryTests extends ESTestCase {
26+
27+
public void testDetectExtractedFields_GivenFloatField() {
28+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
29+
.addAggregatableField("some_float", "float").build();
30+
31+
ExtractedFields extractedFields = DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities);
32+
33+
List<ExtractedField> allFields = extractedFields.getAllFields();
34+
assertThat(allFields.size(), equalTo(1));
35+
assertThat(allFields.get(0).getName(), equalTo("some_float"));
36+
}
37+
38+
public void testDetectExtractedFields_GivenNumericFieldWithMultipleTypes() {
39+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
40+
.addAggregatableField("some_number", "long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float")
41+
.build();
42+
43+
ExtractedFields extractedFields = DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities);
44+
45+
List<ExtractedField> allFields = extractedFields.getAllFields();
46+
assertThat(allFields.size(), equalTo(1));
47+
assertThat(allFields.get(0).getName(), equalTo("some_number"));
48+
}
49+
50+
public void testDetectExtractedFields_GivenNonNumericField() {
51+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
52+
.addAggregatableField("some_keyword", "keyword").build();
53+
54+
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
55+
() -> DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities));
56+
assertThat(e.getMessage(), equalTo("No compatible fields could be detected"));
57+
}
58+
59+
public void testDetectExtractedFields_GivenFieldWithNumericAndNonNumericTypes() {
60+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
61+
.addAggregatableField("indecisive_field", "float", "keyword").build();
62+
63+
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
64+
() -> DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities));
65+
assertThat(e.getMessage(), equalTo("No compatible fields could be detected"));
66+
}
67+
68+
public void testDetectExtractedFields_GivenMultipleFields() {
69+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
70+
.addAggregatableField("some_float", "float")
71+
.addAggregatableField("some_long", "long")
72+
.addAggregatableField("some_keyword", "keyword")
73+
.build();
74+
75+
ExtractedFields extractedFields = DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities);
76+
77+
List<ExtractedField> allFields = extractedFields.getAllFields();
78+
assertThat(allFields.size(), equalTo(2));
79+
assertThat(allFields.stream().map(ExtractedField::getName).collect(Collectors.toSet()),
80+
containsInAnyOrder("some_float", "some_long"));
81+
}
82+
83+
public void testDetectExtractedFields_GivenIgnoredField() {
84+
FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder()
85+
.addAggregatableField("_id", "float").build();
86+
87+
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
88+
() -> DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities));
89+
assertThat(e.getMessage(), equalTo("No compatible fields could be detected"));
90+
}
91+
92+
private static class MockFieldCapsResponseBuilder {
93+
94+
private final Map<String, Map<String, FieldCapabilities>> fieldCaps = new HashMap<>();
95+
96+
private MockFieldCapsResponseBuilder addAggregatableField(String field, String... types) {
97+
Map<String, FieldCapabilities> caps = new HashMap<>();
98+
for (String type : types) {
99+
caps.put(type, new FieldCapabilities(field, type, true, true));
100+
}
101+
fieldCaps.put(field, caps);
102+
return this;
103+
}
104+
105+
private FieldCapabilitiesResponse build() {
106+
FieldCapabilitiesResponse response = mock(FieldCapabilitiesResponse.class);
107+
when(response.get()).thenReturn(fieldCaps);
108+
109+
for (String field : fieldCaps.keySet()) {
110+
when(response.getField(field)).thenReturn(fieldCaps.get(field));
111+
}
112+
return response;
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)