Skip to content

Commit 40e6a57

Browse files
committed
Add dynamic mapping type hint to bulk request
1 parent 22c63aa commit 40e6a57

File tree

22 files changed

+485
-93
lines changed

22 files changed

+485
-93
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
---
2+
"Match mapping type":
3+
- skip:
4+
version: " - 7.99.99"
5+
reason: "introduced in 8.0"
6+
7+
- do:
8+
indices.create:
9+
index: test_index
10+
body:
11+
mappings:
12+
dynamic_templates:
13+
- locations:
14+
custom_mapping_type: location_type
15+
mapping:
16+
type: geo_point
17+
- do:
18+
bulk:
19+
refresh: true
20+
body:
21+
- index:
22+
_index: test_index
23+
_id: id_1
24+
match_mapping_types:
25+
location: location_type
26+
- location: [ -71.34, 41.12 ]
27+
- index:
28+
_index: test_index
29+
_id: id_2
30+
match_mapping_types:
31+
location: location_type
32+
- location: "41.12,-71.34"
33+
- do:
34+
search:
35+
index: test_index
36+
body:
37+
query:
38+
geo_bounding_box:
39+
location:
40+
top_left:
41+
lat: 42
42+
lon: -72
43+
bottom_right:
44+
lat: 40
45+
lon: -74
46+
- match: { hits.total.value: 2 }

server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,46 @@
99

1010
import org.elasticsearch.action.DocWriteResponse;
1111
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
12+
import org.elasticsearch.action.bulk.BulkItemResponse;
13+
import org.elasticsearch.action.bulk.BulkRequest;
1214
import org.elasticsearch.action.bulk.BulkResponse;
15+
import org.elasticsearch.action.index.IndexRequest;
1316
import org.elasticsearch.action.index.IndexRequestBuilder;
17+
import org.elasticsearch.action.search.SearchResponse;
18+
import org.elasticsearch.action.support.WriteRequest;
1419
import org.elasticsearch.cluster.ClusterState;
1520
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1621
import org.elasticsearch.cluster.metadata.MappingMetadata;
1722
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.Randomness;
24+
import org.elasticsearch.common.Strings;
25+
import org.elasticsearch.common.geo.GeoPoint;
1826
import org.elasticsearch.common.settings.Settings;
1927
import org.elasticsearch.common.unit.TimeValue;
28+
import org.elasticsearch.common.xcontent.ObjectPath;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
30+
import org.elasticsearch.common.xcontent.XContentFactory;
31+
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
2032
import org.elasticsearch.plugins.Plugin;
2133
import org.elasticsearch.test.ESIntegTestCase;
2234
import org.elasticsearch.test.InternalSettingsPlugin;
2335
import org.hamcrest.Matchers;
2436

2537
import java.io.IOException;
38+
import java.util.ArrayList;
2639
import java.util.Collection;
2740
import java.util.Collections;
41+
import java.util.List;
2842
import java.util.Map;
2943
import java.util.concurrent.CountDownLatch;
3044
import java.util.concurrent.atomic.AtomicReference;
3145

3246
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
47+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
48+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
49+
import static org.hamcrest.Matchers.containsString;
3350
import static org.hamcrest.Matchers.equalTo;
51+
import static org.hamcrest.Matchers.instanceOf;
3452

3553
public class DynamicMappingIT extends ESIntegTestCase {
3654

@@ -77,6 +95,14 @@ private static void assertMappingsHaveField(GetMappingsResponse mappings, String
7795
assertTrue("Could not find [" + field + "] in " + typeMappingsMap.toString(), properties.containsKey(field));
7896
}
7997

98+
private static void assertFieldMappingType(String index, String field, String expectedType) {
99+
final GetMappingsResponse resp = client().admin().indices().prepareGetMappings(index).get();
100+
final Map<String, Object> mappings = resp.mappings().get(index).sourceAsMap();
101+
final String path = "properties." + String.join(".properties.", field.split("\\.")) + ".type";
102+
Object actualType = ObjectPath.eval(path, mappings);
103+
assertThat(Strings.toString(resp), actualType, equalTo(expectedType));
104+
}
105+
80106
public void testConcurrentDynamicUpdates() throws Throwable {
81107
createIndex("index");
82108
final Thread[] indexThreads = new Thread[32];
@@ -156,4 +182,76 @@ public void testMappingVersionAfterDynamicMappingUpdate() throws Exception {
156182
client().prepareIndex("test").setId("1").setSource("field", "text").get();
157183
assertBusy(() -> assertThat(clusterService.state().metadata().index("test").getMappingVersion(), equalTo(1 + previousVersion)));
158184
}
185+
186+
public void testSimpleTypeHint() throws Exception {
187+
final XContentBuilder mappings = XContentFactory.jsonBuilder();
188+
mappings.startObject();
189+
{
190+
mappings.startArray("dynamic_templates");
191+
{
192+
mappings.startObject();
193+
mappings.startObject("locations");
194+
{
195+
mappings.field("custom_mapping_type", "location");
196+
mappings.startObject("mapping");
197+
{
198+
mappings.field("type", "geo_point");
199+
}
200+
mappings.endObject();
201+
}
202+
mappings.endObject();
203+
mappings.endObject();
204+
}
205+
mappings.endArray();
206+
}
207+
mappings.endObject();
208+
assertAcked(client().admin().indices().prepareCreate("test").setMapping(mappings));
209+
List<IndexRequest> requests = new ArrayList<>();
210+
requests.add(new IndexRequest("test").id("1").source("location", "41.12,-71.34")
211+
.setDynamicMappingTypeHints(Map.of("location", "location")));
212+
requests.add(new IndexRequest("test").id("2").source(
213+
XContentFactory.jsonBuilder()
214+
.startObject()
215+
.startObject("location").field("lat", 41.12).field("lon", -71.34).endObject()
216+
.endObject())
217+
.setDynamicMappingTypeHints(Map.of("location", "location")));
218+
requests.add(new IndexRequest("test").id("3").source("address.location", "41.12,-71.34")
219+
.setDynamicMappingTypeHints(Map.of("address.location", "location")));
220+
requests.add(new IndexRequest("test").id("4").source("location", new double[]{-71.34, 41.12})
221+
.setDynamicMappingTypeHints(Map.of("location", "location")));
222+
requests.add(new IndexRequest("test").id("5").source("array_of_numbers", new double[]{-71.34, 41.12}));
223+
224+
Randomness.shuffle(requests);
225+
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
226+
requests.forEach(bulkRequest::add);
227+
client().bulk(bulkRequest).actionGet();
228+
229+
SearchResponse searchResponse = client().prepareSearch("test")
230+
.setQuery(new GeoBoundingBoxQueryBuilder("location").setCorners(new GeoPoint(42, -72), new GeoPoint(40, -74)))
231+
.get();
232+
assertSearchHits(searchResponse, "1", "2", "4");
233+
searchResponse = client().prepareSearch("test")
234+
.setQuery(new GeoBoundingBoxQueryBuilder("address.location").setCorners(new GeoPoint(42, -72), new GeoPoint(40, -74)))
235+
.get();
236+
assertSearchHits(searchResponse, "3");
237+
}
238+
239+
public void testTypeHintNotFound() throws Exception {
240+
assertAcked(client().admin().indices().prepareCreate("test"));
241+
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
242+
bulkRequest.add(
243+
new IndexRequest("test").id("1").source(
244+
XContentFactory.jsonBuilder()
245+
.startObject()
246+
.field("my_location", "41.12,-71.34")
247+
.endObject())
248+
.setDynamicMappingTypeHints(Map.of("my_location", "location"))
249+
);
250+
final BulkResponse bulkItemResponses = client().bulk(bulkRequest).actionGet();
251+
assertTrue(bulkItemResponses.hasFailures());
252+
for (BulkItemResponse resp : bulkItemResponses.getItems()) {
253+
assertThat(resp.getFailure().getCause(), instanceOf(MapperParsingException.class));
254+
assertThat(resp.getFailureMessage(), containsString("Can't find template for matching type [location] of field [my_location]"));
255+
}
256+
}
159257
}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public final class BulkRequestParser {
5353
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
5454
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
5555
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
56+
private static final ParseField MATCH_MAPPING_TYPES = new ParseField("match_mapping_types");
5657

5758
// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
5859
private final boolean errorOnType;
@@ -156,6 +157,7 @@ public void parse(
156157
int retryOnConflict = 0;
157158
String pipeline = defaultPipeline;
158159
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
160+
Map<String, String> matchMappingTypes = Map.of();
159161

160162
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
161163
// or START_OBJECT which will have another set of parameters
@@ -206,7 +208,10 @@ public void parse(
206208
}
207209
} else if (token == XContentParser.Token.START_ARRAY) {
208210
throw new IllegalArgumentException("Malformed action/metadata line [" + line +
209-
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
211+
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
212+
} else if (token == XContentParser.Token.START_OBJECT &&
213+
MATCH_MAPPING_TYPES.match(currentFieldName, parser.getDeprecationHandler())) {
214+
matchMappingTypes = parser.mapStrings();
210215
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName,
211216
parser.getDeprecationHandler())) {
212217
fetchSourceContext = FetchSourceContext.fromXContent(parser);
@@ -238,6 +243,7 @@ public void parse(
238243
.version(version).versionType(versionType)
239244
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
240245
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
246+
.setDynamicMappingTypeHints(matchMappingTypes)
241247
.setRequireAlias(requireAlias), type);
242248
} else {
243249
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
@@ -252,6 +258,7 @@ public void parse(
252258
.version(version).versionType(versionType)
253259
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
254260
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
261+
.setDynamicMappingTypeHints(matchMappingTypes)
255262
.setRequireAlias(requireAlias), type);
256263
} else if ("update".equals(action)) {
257264
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,9 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
281281
request.ifSeqNo(), request.ifPrimaryTerm());
282282
} else {
283283
final IndexRequest request = context.getRequestToExecute();
284-
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
285-
request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
284+
final SourceToParse sourceToParse = new SourceToParse(request.index(), request.id(), request.source(),
285+
request.getContentType(), request.routing(), request.getDynamicMappingTypeHints());
286+
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), sourceToParse,
286287
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
287288
}
288289
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
@@ -479,8 +480,8 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
479480
case INDEX:
480481
final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
481482
final ShardId shardId = replica.shardId();
482-
final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.id(),
483-
indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());
483+
final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.id(), indexRequest.source(),
484+
indexRequest.getContentType(), indexRequest.routing(), Map.of());
484485
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(),
485486
primaryResponse.getVersion(), indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
486487
break;

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
109109
private long ifSeqNo = UNASSIGNED_SEQ_NO;
110110
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
111111

112+
private Map<String, String> dynamicMappingTypeHints = Map.of();
113+
112114
public IndexRequest(StreamInput in) throws IOException {
113115
this(null, in);
114116
}
@@ -146,6 +148,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
146148
} else {
147149
requireAlias = false;
148150
}
151+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
152+
dynamicMappingTypeHints = in.readMap(StreamInput::readString, StreamInput::readString);
153+
} else {
154+
dynamicMappingTypeHints = Map.of();
155+
}
149156
}
150157

151158
public IndexRequest() {
@@ -655,6 +662,13 @@ private void writeBody(StreamOutput out) throws IOException {
655662
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
656663
out.writeBoolean(requireAlias);
657664
}
665+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
666+
out.writeMap(dynamicMappingTypeHints, StreamOutput::writeString, StreamOutput::writeString);
667+
} else {
668+
if (dynamicMappingTypeHints.isEmpty() == false) {
669+
throw new IllegalStateException("Dynamic mapping type hints requires all nodes in the cluster on 8.0 or later");
670+
}
671+
}
658672
}
659673

660674
@Override
@@ -712,4 +726,13 @@ public IndexRequest setRequireAlias(boolean requireAlias) {
712726
this.requireAlias = requireAlias;
713727
return this;
714728
}
729+
730+
public IndexRequest setDynamicMappingTypeHints(Map<String, String> dynamicMappingTypeHints) {
731+
this.dynamicMappingTypeHints = Objects.requireNonNull(dynamicMappingTypeHints);
732+
return this;
733+
}
734+
735+
public Map<String, String> getDynamicMappingTypeHints() {
736+
return dynamicMappingTypeHints;
737+
}
715738
}

server/src/main/java/org/elasticsearch/index/engine/SingleDocDirectoryReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.index.translog.Translog;
3838

3939
import java.io.IOException;
40+
import java.util.Map;
4041
import java.util.concurrent.atomic.AtomicReference;
4142

4243
/**
@@ -140,7 +141,8 @@ private LeafReader getDelegate() {
140141
private LeafReader createInMemoryLeafReader() {
141142
assert Thread.holdsLock(this);
142143
final ParsedDocument parsedDocs = mapper.parse(new SourceToParse(shardId.getIndexName(), operation.id(),
143-
operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()));
144+
operation.source(), XContentHelper.xContentType(operation.source()), operation.routing(), Map.of()));
145+
144146
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
145147
parsedDocs.version().setLongValue(operation.version());
146148
final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE);

server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields,
224224
// Slow path: recreate stored fields from original source
225225
assert source != null : "original source in translog must exist";
226226
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),
227-
fieldVisitor.routing());
227+
fieldVisitor.routing(), Map.of());
228228
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
229229
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
230230
// update special fields

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,8 @@ private static void parseArray(ParseContext context, ObjectMapper parentMapper,
543543
// TODO: shouldn't this skip, not parse?
544544
parseNonDynamicArray(context, parentMapper, lastFieldName, arrayFieldName);
545545
} else {
546-
Mapper objectMapperFromTemplate = dynamic.getDynamicFieldsBuilder().createObjectMapperFromTemplate(context, arrayFieldName);
546+
Mapper objectMapperFromTemplate =
547+
dynamic.getDynamicFieldsBuilder().createFieldOrObjectMapperFromTemplate(context, arrayFieldName);
547548
if (objectMapperFromTemplate == null) {
548549
parseNonDynamicArray(context, parentMapper, lastFieldName, arrayFieldName);
549550
} else {
@@ -711,7 +712,12 @@ private static Tuple<Integer, ObjectMapper> getDynamicParentMapper(ParseContext
711712
return new Tuple<>(pathsAdded, parent);
712713
} else {
713714
//objects are created under properties even with dynamic: runtime, as the runtime section only holds leaf fields
714-
mapper = (ObjectMapper) dynamic.getDynamicFieldsBuilder().createDynamicObjectMapper(context, paths[i]);
715+
final Mapper fieldMapper = dynamic.getDynamicFieldsBuilder().createDynamicObjectMapper(context, paths[i]);
716+
if (fieldMapper instanceof ObjectMapper == false) {
717+
throw new MapperParsingException("Field [" + context.path().pathAsText(paths[i]) + "] must be an object; " +
718+
"but it's configured as [" + fieldMapper.typeName() + "] in dynamic templates");
719+
}
720+
mapper = (ObjectMapper) fieldMapper;
715721
if (mapper.nested() != ObjectMapper.Nested.NO) {
716722
throw new MapperParsingException("It is forbidden to create dynamic nested objects (["
717723
+ context.path().pathAsText(paths[i]) + "]) through `copy_to` or dots in field names");

0 commit comments

Comments
 (0)