Skip to content

Commit 54ecb1f

Browse files
author
Hendrik Muhs
committed
insert explict mappings for objects in nested output to avoid clashes with
index templates fixes elastic#51321
1 parent 23be11c commit 54ecb1f

File tree

3 files changed

+203
-47
lines changed

3 files changed

+203
-47
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.integration;
8+
9+
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
11+
import org.junit.Before;
12+
13+
import java.io.IOException;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
import static org.hamcrest.Matchers.equalTo;
18+
19+
public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
20+
private static boolean indicesCreated = false;
21+
22+
// preserve indices in order to reuse source indices in several test cases
23+
@Override
24+
protected boolean preserveIndicesUponCompletion() {
25+
return true;
26+
}
27+
28+
@Before
29+
public void createIndexes() throws IOException {
30+
31+
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
32+
if (indicesCreated) {
33+
return;
34+
}
35+
36+
createReviewsIndex();
37+
indicesCreated = true;
38+
}
39+
40+
public void testIndexTemplateMappingClash() throws Exception {
41+
String transformId = "special_pivot_template_mappings_clash";
42+
String transformIndex = "special_pivot_template_mappings_clash";
43+
44+
// create a template that defines a field "rating" with a type "float" which will clash later with
45+
// output field "rating.avg" in the pivot config
46+
final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template");
47+
48+
String template = "{"
49+
+ "\"index_patterns\" : [\"special_pivot_template*\"],"
50+
+ " \"mappings\" : {"
51+
+ " \"properties\": {"
52+
+ " \"rating\":{"
53+
+ " \"type\": \"float\"\n"
54+
+ " }"
55+
+ " }"
56+
+ " }"
57+
+ "}";
58+
59+
60+
createIndexTemplateRequest.setJsonEntity(template);
61+
Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
62+
assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));
63+
64+
final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
65+
66+
String config = "{"
67+
+ " \"source\": {\"index\":\""
68+
+ REVIEWS_INDEX_NAME
69+
+ "\"},"
70+
+ " \"dest\": {\"index\":\""
71+
+ transformIndex
72+
+ "\"},";
73+
74+
config += " \"pivot\": {"
75+
+ " \"group_by\": {"
76+
+ " \"reviewer\": {"
77+
+ " \"terms\": {"
78+
+ " \"field\": \"user_id\""
79+
+ " } } },"
80+
+ " \"aggregations\": {"
81+
+ " \"rating.avg\": {"
82+
+ " \"avg\": {"
83+
+ " \"field\": \"stars\""
84+
+ " } }"
85+
+ " } }"
86+
+ "}";
87+
88+
createTransformRequest.setJsonEntity(config);
89+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
90+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
91+
92+
startAndWaitForTransform(transformId, transformIndex);
93+
assertTrue(indexExists(transformIndex));
94+
95+
// we expect 27 documents as there shall be 27 user_id's
96+
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
97+
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
98+
99+
// get and check some users
100+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
101+
102+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
103+
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
104+
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
105+
}
106+
}

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,9 @@ static int getDataFrameCheckpoint(String transformId) throws IOException {
424424
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
425425

426426
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
427+
428+
// assert that the transform did not fail
429+
assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
427430
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
428431
}
429432

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

Lines changed: 94 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ public final class SchemaUtil {
4242
NUMERIC_FIELD_MAPPER_TYPES = types;
4343
}
4444

45-
private SchemaUtil() {
46-
}
45+
private SchemaUtil() {}
4746

4847
public static boolean isNumericType(String type) {
4948
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
@@ -59,27 +58,29 @@ public static boolean isNumericType(String type) {
5958
* @param source Source index that contains the data to pivot
6059
* @param listener Listener to alert on success or failure.
6160
*/
62-
public static void deduceMappings(final Client client,
63-
final PivotConfig config,
64-
final String[] source,
65-
final ActionListener<Map<String, String>> listener) {
61+
public static void deduceMappings(
62+
final Client client,
63+
final PivotConfig config,
64+
final String[] source,
65+
final ActionListener<Map<String, String>> listener
66+
) {
6667
// collects the fieldnames used as source for aggregations
6768
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
6869
// collects the aggregation types by source name
6970
Map<String, String> aggregationTypes = new HashMap<>();
7071
// collects the fieldnames and target fieldnames used for grouping
7172
Map<String, String> fieldNamesForGrouping = new HashMap<>();
7273

73-
config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> {
74-
fieldNamesForGrouping.put(destinationFieldName, group.getField());
75-
});
74+
config.getGroupConfig()
75+
.getGroups()
76+
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
7677

7778
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
7879
if (agg instanceof ValuesSourceAggregationBuilder) {
7980
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
8081
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
8182
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
82-
} else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
83+
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
8384
aggregationTypes.put(agg.getName(), agg.getType());
8485
} else {
8586
// execution should not reach this point
@@ -98,13 +99,17 @@ public static void deduceMappings(final Client client,
9899
allFieldNames.putAll(aggregationSourceFieldNames);
99100
allFieldNames.putAll(fieldNamesForGrouping);
100101

101-
getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
102+
getSourceFieldMappings(
103+
client,
104+
source,
105+
allFieldNames.values().toArray(new String[0]),
102106
ActionListener.wrap(
103-
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
104-
aggregationTypes,
105-
fieldNamesForGrouping,
106-
sourceMappings)),
107-
listener::onFailure));
107+
sourceMappings -> listener.onResponse(
108+
resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
109+
),
110+
listener::onFailure
111+
)
112+
);
108113
}
109114

110115
/**
@@ -115,36 +120,37 @@ public static void deduceMappings(final Client client,
115120
* @param index The index, or index pattern, from which to gather all the field mappings
116121
* @param listener The listener to be alerted on success or failure.
117122
*/
118-
public static void getDestinationFieldMappings(final Client client,
119-
final String index,
120-
final ActionListener<Map<String, String>> listener) {
121-
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
122-
.indices(index)
123+
public static void getDestinationFieldMappings(
124+
final Client client,
125+
final String index,
126+
final ActionListener<Map<String, String>> listener
127+
) {
128+
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
123129
.fields("*")
124130
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
125-
ClientHelper.executeAsyncWithOrigin(client,
131+
ClientHelper.executeAsyncWithOrigin(
132+
client,
126133
ClientHelper.TRANSFORM_ORIGIN,
127134
FieldCapabilitiesAction.INSTANCE,
128135
fieldCapabilitiesRequest,
129-
ActionListener.wrap(
130-
r -> listener.onResponse(extractFieldMappings(r)),
131-
listener::onFailure
132-
));
136+
ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure)
137+
);
133138
}
134139

135-
private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
136-
Map<String, String> aggregationTypes,
137-
Map<String, String> fieldNamesForGrouping,
138-
Map<String, String> sourceMappings) {
140+
private static Map<String, String> resolveMappings(
141+
Map<String, String> aggregationSourceFieldNames,
142+
Map<String, String> aggregationTypes,
143+
Map<String, String> fieldNamesForGrouping,
144+
Map<String, String> sourceMappings
145+
) {
139146
Map<String, String> targetMapping = new HashMap<>();
140147

141148
aggregationTypes.forEach((targetFieldName, aggregationName) -> {
142149
String sourceFieldName = aggregationSourceFieldNames.get(targetFieldName);
143150
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
144151
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
145152

146-
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
147-
targetFieldName, aggregationName, destinationMapping);
153+
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);
148154

149155
if (Aggregations.isDynamicMapping(destinationMapping)) {
150156
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
@@ -165,34 +171,75 @@ private static Map<String, String> resolveMappings(Map<String, String> aggregati
165171
targetMapping.put(targetFieldName, "keyword");
166172
}
167173
});
174+
175+
// insert object mappings for nested fields
176+
insertNestedObjectMappings(targetMapping);
177+
168178
return targetMapping;
169179
}
170180

171181
/*
172182
* Very "magic" helper method to extract the source mappings
173183
*/
174-
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
175-
ActionListener<Map<String, String>> listener) {
176-
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
177-
.indices(index)
184+
private static void getSourceFieldMappings(
185+
Client client,
186+
String[] index,
187+
String[] fields,
188+
ActionListener<Map<String, String>> listener
189+
) {
190+
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
178191
.fields(fields)
179192
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
180-
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
181-
response -> listener.onResponse(extractFieldMappings(response)),
182-
listener::onFailure));
193+
client.execute(
194+
FieldCapabilitiesAction.INSTANCE,
195+
fieldCapabilitiesRequest,
196+
ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)
197+
);
183198
}
184199

185200
private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
186201
Map<String, String> extractedTypes = new HashMap<>();
187202

188-
response.get().forEach((fieldName, capabilitiesMap) -> {
189-
// TODO: overwrites types, requires resolve if
190-
// types are mixed
191-
capabilitiesMap.forEach((name, capability) -> {
192-
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
193-
extractedTypes.put(fieldName, capability.getType());
194-
});
195-
});
203+
response.get()
204+
.forEach(
205+
(fieldName, capabilitiesMap) -> {
206+
// TODO: overwrites types, requires resolve if
207+
// types are mixed
208+
capabilitiesMap.forEach((name, capability) -> {
209+
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
210+
extractedTypes.put(fieldName, capability.getType());
211+
});
212+
}
213+
);
196214
return extractedTypes;
197215
}
216+
217+
/**
218+
* Insert object mappings for fields like:
219+
*
220+
* a.b.c : some_type
221+
*
222+
* in which case it creates additional mappings:
223+
*
224+
* a.b : object
225+
* a : object
226+
*
227+
* avoids snafu with index templates injecting incompatible mappings
228+
*
229+
* @param fieldMappings field mappings to inject to
230+
*/
231+
static void insertNestedObjectMappings(Map<String, String> fieldMappings) {
232+
Map<String, String> additionalMappings = new HashMap<>();
233+
fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> {
234+
int pos;
235+
String objectKey = key;
236+
// lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0
237+
while ((pos = objectKey.lastIndexOf(".")) > 0) {
238+
objectKey = objectKey.substring(0, pos);
239+
additionalMappings.putIfAbsent(objectKey, "object");
240+
}
241+
});
242+
243+
additionalMappings.forEach(fieldMappings::putIfAbsent);
244+
}
198245
}

0 commit comments

Comments
 (0)