Skip to content

Commit c06d9ab

Browse files
committed
Apply review comments
1 parent 9234619 commit c06d9ab

File tree

11 files changed

+77
-78
lines changed

11 files changed

+77
-78
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,11 @@ public Map<String, Long> getFieldCardinalityLimits() {
247247
}
248248

249249
@Override
250-
public Map<String, String> getFieldMappingsToCopy(String resultsFieldName) {
251-
return Map.of(
252-
resultsFieldName + "." + predictionFieldName, dependentVariable,
253-
resultsFieldName + ".top_classes.class_name", dependentVariable);
250+
public Map<String, String> getExplicitlyMappedFields(String resultsFieldName) {
251+
return new HashMap<>() {{
252+
put(resultsFieldName + "." + predictionFieldName, dependentVariable);
253+
put(resultsFieldName + ".top_classes.class_name", dependentVariable);
254+
}};
254255
}
255256

256257
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ public interface DataFrameAnalysis extends ToXContentObject, NamedWriteable {
4242
Map<String, Long> getFieldCardinalityLimits();
4343

4444
/**
45-
* Returns field mappings to be copied from source index to destination index.
45+
* Returns fields for which the mappings should be copied from source index to destination index.
4646
* Each entry of the returned {@link Map} is of the form:
4747
* key - field path in the destination index
4848
* value - field path in the source index from which the mapping should be taken
4949
*
5050
* @param resultsFieldName name of the results field under which all the results are stored
51-
* @return {@link Map} containing field mappings to be copied from source index to destination index
51+
* @return {@link Map} containing fields for which the mappings should be copied from source index to destination index
5252
*/
53-
Map<String, String> getFieldMappingsToCopy(String resultsFieldName);
53+
Map<String, String> getExplicitlyMappedFields(String resultsFieldName);
5454

5555
/**
5656
* @return {@code true} if this analysis supports data frame rows with missing values

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public Map<String, Long> getFieldCardinalityLimits() {
230230
}
231231

232232
@Override
233-
public Map<String, String> getFieldMappingsToCopy(String resultsFieldName) {
233+
public Map<String, String> getExplicitlyMappedFields(String resultsFieldName) {
234234
return Collections.emptyMap();
235235
}
236236

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public Map<String, Long> getFieldCardinalityLimits() {
187187
}
188188

189189
@Override
190-
public Map<String, String> getFieldMappingsToCopy(String resultsFieldName) {
190+
public Map<String, String> getExplicitlyMappedFields(String resultsFieldName) {
191191
return Collections.singletonMap(resultsFieldName + "." + predictionFieldName, dependentVariable);
192192
}
193193

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.hamcrest.Matchers.is;
3131
import static org.hamcrest.Matchers.not;
3232
import static org.hamcrest.Matchers.notNullValue;
33+
import static org.hamcrest.Matchers.nullValue;
3334

3435
public class ClassificationTests extends AbstractSerializingTestCase<Classification> {
3536

@@ -172,7 +173,7 @@ public void testFieldCardinalityLimitsIsNonEmpty() {
172173
}
173174

174175
public void testFieldMappingsToCopyIsNonEmpty() {
175-
assertThat(createTestInstance().getFieldMappingsToCopy(""), is(not(anEmptyMap())));
176+
assertThat(createTestInstance().getExplicitlyMappedFields(""), is(not(anEmptyMap())));
176177
}
177178

178179
public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetectionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void testFieldCardinalityLimitsIsEmpty() {
9393
}
9494

9595
public void testFieldMappingsToCopyIsEmpty() {
96-
assertThat(createTestInstance().getFieldMappingsToCopy(""), is(anEmptyMap()));
96+
assertThat(createTestInstance().getExplicitlyMappedFields(""), is(anEmptyMap()));
9797
}
9898

9999
public void testGetStateDocId() {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.hamcrest.Matchers.is;
2727
import static org.hamcrest.Matchers.not;
2828
import static org.hamcrest.Matchers.notNullValue;
29+
import static org.hamcrest.Matchers.nullValue;
2930

3031
public class RegressionTests extends AbstractSerializingTestCase<Regression> {
3132

@@ -109,7 +110,7 @@ public void testFieldCardinalityLimitsIsEmpty() {
109110
}
110111

111112
public void testFieldMappingsToCopyIsNonEmpty() {
112-
assertThat(createTestInstance().getFieldMappingsToCopy(""), is(not(anEmptyMap())));
113+
assertThat(createTestInstance().getExplicitlyMappedFields(""), is(not(anEmptyMap())));
113114
}
114115

115116
public void testGetStateDocId() {

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
109109
"Estimated memory usage for this analytics to be",
110110
"Starting analytics on node",
111111
"Started analytics",
112-
(analysisUsesExistingDestIndex ? "Using existing" : "Creating") + " destination index [" + destIndex + "]",
112+
expectedDestIndexAuditMessage(),
113113
"Finished reindexing to destination index [" + destIndex + "]",
114114
"Finished analysis");
115115
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
@@ -150,7 +150,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti
150150
"Estimated memory usage for this analytics to be",
151151
"Starting analytics on node",
152152
"Started analytics",
153-
(analysisUsesExistingDestIndex ? "Using existing" : "Creating") + " destination index [" + destIndex + "]",
153+
expectedDestIndexAuditMessage(),
154154
"Finished reindexing to destination index [" + destIndex + "]",
155155
"Finished analysis");
156156
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
@@ -211,7 +211,7 @@ public <T> void testWithOnlyTrainingRowsAndTrainingPercentIsFifty(String jobId,
211211
"Estimated memory usage for this analytics to be",
212212
"Starting analytics on node",
213213
"Started analytics",
214-
(analysisUsesExistingDestIndex ? "Using existing" : "Creating") + " destination index [" + destIndex + "]",
214+
expectedDestIndexAuditMessage(),
215215
"Finished reindexing to destination index [" + destIndex + "]",
216216
"Finished analysis");
217217
assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField);
@@ -529,4 +529,8 @@ private void assertMlResultsFieldMappings(String predictedClassField, String exp
529529
private String stateDocId() {
530530
return jobId + "_classification_state#1";
531531
}
532+
533+
private String expectedDestIndexAuditMessage() {
534+
return (analysisUsesExistingDestIndex ? "Using existing" : "Creating") + " destination index [" + destIndex + "]";
535+
}
532536
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java

+28-33
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnaly
124124
Map<String, Object> mappingsAsMap = mappings.valuesIt().next().sourceAsMap();
125125
Map<String, Object> properties = getOrPutDefault(mappingsAsMap, PROPERTIES, HashMap::new);
126126
checkResultsFieldIsNotPresentInSourceIndex(config, properties);
127-
addProperties(properties, config, Collections.unmodifiableMap(properties));
127+
properties.putAll(createAdditionalMappings(config, Collections.unmodifiableMap(properties)));
128128
Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
129-
addMetaData(metadata, config.getId(), clock);
129+
metadata.putAll(createMetaData(config.getId(), clock));
130130
return new CreateIndexRequest(destinationIndex, settings).mapping(type, mappingsAsMap);
131131
}
132132

@@ -160,25 +160,28 @@ private static Integer findMaxSettingValue(GetSettingsResponse settingsResponse,
160160
return maxValue;
161161
}
162162

163-
private static void addProperties(Map<String, Object> properties,
164-
DataFrameAnalyticsConfig config,
165-
Map<String, Object> sourceIndexMappingsProperties) {
163+
private static Map<String, Object> createAdditionalMappings(DataFrameAnalyticsConfig config, Map<String, Object> mappingsProperties) {
164+
HashMap<String, Object> properties = new HashMap<>();
166165
properties.put(ID_COPY, Map.of("type", "keyword"));
167-
for (Map.Entry<String, String> entry : config.getAnalysis().getFieldMappingsToCopy(config.getDest().getResultsField()).entrySet()) {
166+
for (Map.Entry<String, String> entry
167+
: config.getAnalysis().getExplicitlyMappedFields(config.getDest().getResultsField()).entrySet()) {
168168
String destFieldPath = entry.getKey();
169169
String sourceFieldPath = entry.getValue();
170-
Object sourceFieldMapping = sourceIndexMappingsProperties.get(sourceFieldPath);
170+
Object sourceFieldMapping = mappingsProperties.get(sourceFieldPath);
171171
if (sourceFieldMapping != null) {
172172
properties.put(destFieldPath, sourceFieldMapping);
173173
}
174174
}
175+
return properties;
175176
}
176177

177-
private static void addMetaData(Map<String, Object> metadata, String analyticsId, Clock clock) {
178+
private static Map<String, Object> createMetaData(String analyticsId, Clock clock) {
179+
HashMap<String, Object> metadata = new HashMap<>();
178180
metadata.put(CREATION_DATE_MILLIS, clock.millis());
179181
metadata.put(CREATED_BY, "data-frame-analytics");
180182
metadata.put(VERSION, Map.of(CREATED, Version.CURRENT));
181183
metadata.put(ANALYTICS, analyticsId);
184+
return metadata;
182185
}
183186

184187
@SuppressWarnings("unchecked")
@@ -197,32 +200,24 @@ public static void updateMappingsToDestIndex(Client client, DataFrameAnalyticsCo
197200
// We have validated the destination index should match a single index
198201
assert getIndexResponse.indices().length == 1;
199202

200-
ActionListener<ImmutableOpenMap<String, MappingMetaData>> mappingsListener = ActionListener.wrap(
201-
mappings -> {
202-
// Fetch merged mappings from source indices
203-
Map<String, Object> sourceMappingsAsMap = mappings.valuesIt().next().sourceAsMap();
204-
Map<String, Object> sourcePropertiesAsMap =
205-
(Map<String, Object>)sourceMappingsAsMap.getOrDefault(PROPERTIES, Collections.emptyMap());
206-
207-
// Verify that the results field does not exist in the source indices
208-
checkResultsFieldIsNotPresentInSourceIndex(config, sourcePropertiesAsMap);
209-
210-
// Determine mappings to be added to the destination index
211-
Map<String, Object> properties = new HashMap<>();
212-
addProperties(properties, config, Collections.unmodifiableMap(sourcePropertiesAsMap));
213-
Map<String, Object> addedMappings = Map.of(PROPERTIES, properties);
214-
215-
// Add the mappings to the destination index
216-
PutMappingRequest putMappingRequest =
217-
new PutMappingRequest(getIndexResponse.indices())
218-
.source(addedMappings);
219-
ClientHelper.executeWithHeadersAsync(
220-
config.getHeaders(), ML_ORIGIN, client, PutMappingAction.INSTANCE, putMappingRequest, listener);
221-
},
222-
listener::onFailure
223-
);
203+
// Fetch mappings from destination index
204+
Map<String, Object> destMappingsAsMap = getIndexResponse.mappings().valuesIt().next().sourceAsMap();
205+
Map<String, Object> destPropertiesAsMap =
206+
(Map<String, Object>)destMappingsAsMap.getOrDefault(PROPERTIES, Collections.emptyMap());
224207

225-
MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource(), mappingsListener);
208+
// Verify that the results field does not exist in the source indices
209+
checkResultsFieldIsNotPresentInSourceIndex(config, destPropertiesAsMap);
210+
211+
// Determine mappings to be added to the destination index
212+
Map<String, Object> addedMappings =
213+
Map.of(PROPERTIES, createAdditionalMappings(config, Collections.unmodifiableMap(destPropertiesAsMap)));
214+
215+
// Add the mappings to the destination index
216+
PutMappingRequest putMappingRequest =
217+
new PutMappingRequest(getIndexResponse.indices())
218+
.source(addedMappings);
219+
ClientHelper.executeWithHeadersAsync(
220+
config.getHeaders(), ML_ORIGIN, client, PutMappingAction.INSTANCE, putMappingRequest, listener);
226221
}
227222

228223
private static void checkResultsFieldIsNotPresentInSourceIndex(DataFrameAnalyticsConfig config, Map<String, Object> properties) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java

+26-29
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ml.dataframe;
77

8+
import org.elasticsearch.ElasticsearchStatusException;
89
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
@@ -37,9 +38,9 @@
3738
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
3839
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
3940
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
41+
import org.junit.Assert;
4042
import org.junit.Before;
4143
import org.mockito.ArgumentCaptor;
42-
import org.mockito.InOrder;
4344
import org.mockito.stubbing.Answer;
4445

4546
import java.io.IOException;
@@ -57,9 +58,12 @@
5758
import static org.hamcrest.Matchers.is;
5859
import static org.mockito.Matchers.any;
5960
import static org.mockito.Matchers.eq;
61+
import static org.mockito.Mockito.atLeastOnce;
6062
import static org.mockito.Mockito.doAnswer;
61-
import static org.mockito.Mockito.inOrder;
6263
import static org.mockito.Mockito.mock;
64+
import static org.mockito.Mockito.verify;
65+
import static org.mockito.Mockito.verifyNoMoreInteractions;
66+
import static org.mockito.Mockito.verifyZeroInteractions;
6367
import static org.mockito.Mockito.when;
6468

6569
public class DataFrameAnalyticsIndexTests extends ESTestCase {
@@ -215,34 +219,29 @@ private Map<String, Object> testUpdateMappingsToDestIndex(DataFrameAnalysis anal
215219

216220
ImmutableOpenMap.Builder<String, MappingMetaData> mappings = ImmutableOpenMap.builder();
217221
mappings.put("", new MappingMetaData("_doc", Map.of("properties", properties)));
218-
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
222+
GetIndexResponse getIndexResponse =
223+
new GetIndexResponse(
224+
new String[] { DEST_INDEX }, mappings.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
219225

220226
ArgumentCaptor<GetMappingsRequest> getMappingsRequestCaptor = ArgumentCaptor.forClass(GetMappingsRequest.class);
221227
ArgumentCaptor<PutMappingRequest> putMappingRequestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class);
222228

223-
doAnswer(callListenerOnResponse(getMappingsResponse))
224-
.when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());
225229
doAnswer(callListenerOnResponse(new AcknowledgedResponse(true)))
226230
.when(client).execute(eq(PutMappingAction.INSTANCE), putMappingRequestCaptor.capture(), any());
227231

228232
DataFrameAnalyticsIndex.updateMappingsToDestIndex(
229233
client,
230234
config,
231-
new GetIndexResponse(
232-
new String[] { DEST_INDEX }, ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of()),
235+
getIndexResponse,
233236
ActionListener.wrap(
234237
response -> assertThat(response.isAcknowledged(), is(true)),
235238
e -> fail(e.getMessage())
236239
)
237240
);
238241

239-
InOrder inOrder = inOrder(client);
240-
inOrder.verify(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());
241-
inOrder.verify(client).execute(eq(PutMappingAction.INSTANCE), any(), any());
242-
inOrder.verifyNoMoreInteractions();
243-
244-
GetMappingsRequest getMappingsRequest = getMappingsRequestCaptor.getValue();
245-
assertThat(getMappingsRequest.indices(), arrayContaining(SOURCE_INDEX));
242+
verify(client, atLeastOnce()).threadPool();
243+
verify(client).execute(eq(PutMappingAction.INSTANCE), any(), any());
244+
verifyNoMoreInteractions(client);
246245

247246
PutMappingRequest putMappingRequest = putMappingRequestCaptor.getValue();
248247
assertThat(putMappingRequest.indices(), arrayContaining(DEST_INDEX));
@@ -275,22 +274,20 @@ public void testUpdateMappingsToDestIndex_ResultsFieldsExistsInSourceIndex() {
275274

276275
ImmutableOpenMap.Builder<String, MappingMetaData> mappings = ImmutableOpenMap.builder();
277276
mappings.put("", new MappingMetaData("_doc", Map.of("properties", Map.of("ml", "some-mapping"))));
278-
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
279-
280-
doAnswer(callListenerOnResponse(getMappingsResponse)).when(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());
281-
282-
DataFrameAnalyticsIndex.updateMappingsToDestIndex(
283-
client,
284-
config,
277+
GetIndexResponse getIndexResponse =
285278
new GetIndexResponse(
286-
new String[] { DEST_INDEX }, ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of()),
287-
ActionListener.wrap(
288-
response -> fail("should not succeed"),
289-
e -> assertThat(
290-
e.getMessage(),
291-
equalTo("A field that matches the dest.results_field [ml] already exists; please set a different results_field"))
292-
)
293-
);
279+
new String[] { DEST_INDEX }, mappings.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
280+
281+
ElasticsearchStatusException e =
282+
expectThrows(
283+
ElasticsearchStatusException.class,
284+
() -> DataFrameAnalyticsIndex.updateMappingsToDestIndex(
285+
client, config, getIndexResponse, ActionListener.wrap(Assert::fail)));
286+
assertThat(
287+
e.getMessage(),
288+
equalTo("A field that matches the dest.results_field [ml] already exists; please set a different results_field"));
289+
290+
verifyZeroInteractions(client);
294291
}
295292

296293
private static <Response> Answer<Response> callListenerOnResponse(Response response) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public void testDetect_GivenFieldIsNotIncludedAndIsExcluded() {
241241
analyzedFields = new FetchSourceContext(true, new String[] {"foo"}, new String[] {"bar"});
242242

243243
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
244-
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
244+
SOURCE_INDEX, buildOutlierDetectionConfig(), 100, fieldCapabilities, Collections.emptyMap());
245245
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
246246

247247
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();

0 commit comments

Comments
 (0)