Skip to content

Commit 7a622f0

Browse files
authored
Remove types from BulkRequest (#46983)
This commit removes types entirely from BulkRequest, both as a global parameter and as individual entries on update/index/delete lines. Relates to #41059
1 parent e99435a commit 7a622f0

File tree

83 files changed

+227
-1015
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+227
-1015
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java

Lines changed: 6 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.elasticsearch.common.unit.TimeValue;
3737
import org.elasticsearch.common.xcontent.XContentBuilder;
3838
import org.elasticsearch.common.xcontent.XContentType;
39-
import org.elasticsearch.index.mapper.MapperService;
40-
import org.elasticsearch.rest.action.document.RestBulkAction;
4139
import org.elasticsearch.search.SearchHit;
4240
import org.hamcrest.Matcher;
4341

@@ -74,12 +72,6 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
7472
bulkListener), listener);
7573
}
7674

77-
private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
78-
return BulkProcessor.builder(
79-
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
80-
bulkListener), listener);
81-
}
82-
8375
public void testThatBulkProcessorCountIsCorrect() throws Exception {
8476
final CountDownLatch latch = new CountDownLatch(1);
8577
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
@@ -170,7 +162,6 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
170162
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
171163
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
172164
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
173-
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
174165
//with concurrent requests > 1 we can't rely on the order of the bulk requests
175166
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
176167
//we do want to check that we don't get duplicate ids back
@@ -269,7 +260,6 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
269260
Set<String> readOnlyIds = new HashSet<>();
270261
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
271262
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
272-
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
273263
if (bulkItemResponse.getIndex().equals("test")) {
274264
assertThat(bulkItemResponse.isFailed(), equalTo(false));
275265
//with concurrent requests > 1 we can't rely on the order of the bulk requests
@@ -298,7 +288,6 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
298288
// tag::bulk-processor-mix-parameters
299289
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
300290
.setGlobalIndex("tweets")
301-
.setGlobalType("_doc")
302291
.setGlobalRouting("routing")
303292
.setGlobalPipeline("pipeline_id")
304293
.build()) {
@@ -326,33 +315,29 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
326315
createIndexWithMultipleShards("test");
327316

328317
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
329-
final String customType = "testType";
330-
final String ignoredType = "ignoredType";
331318

332319
int numDocs = randomIntBetween(10, 10);
333320
{
334321
final CountDownLatch latch = new CountDownLatch(1);
335322
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
336323
//Check that untyped document additions inherit the global type
337-
String globalType = customType;
338324
String localType = null;
339-
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
325+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
340326
//let's make sure that the bulk action limit trips, one single execution will index all the documents
341327
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
342328
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
343329
.setGlobalIndex("test")
344-
.setGlobalType(globalType)
345330
.setGlobalRouting("routing")
346331
.setGlobalPipeline("pipeline_id")
347332
.build()) {
348333

349-
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
334+
indexDocs(processor, numDocs, null, localType, "test", "pipeline_id");
350335
latch.await();
351336

352337
assertThat(listener.beforeCounts.get(), equalTo(1));
353338
assertThat(listener.afterCounts.get(), equalTo(1));
354339
assertThat(listener.bulkFailures.size(), equalTo(0));
355-
assertResponseItems(listener.bulkItems, numDocs, globalType);
340+
assertResponseItems(listener.bulkItems, numDocs);
356341

357342
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
358343

@@ -361,65 +346,6 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
361346
}
362347

363348
}
364-
{
365-
//Check that typed document additions don't inherit the global type
366-
String globalType = ignoredType;
367-
String localType = customType;
368-
final CountDownLatch latch = new CountDownLatch(1);
369-
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
370-
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
371-
//let's make sure that the bulk action limit trips, one single execution will index all the documents
372-
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
373-
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
374-
.setGlobalIndex("test")
375-
.setGlobalType(globalType)
376-
.setGlobalRouting("routing")
377-
.setGlobalPipeline("pipeline_id")
378-
.build()) {
379-
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
380-
latch.await();
381-
382-
assertThat(listener.beforeCounts.get(), equalTo(1));
383-
assertThat(listener.afterCounts.get(), equalTo(1));
384-
assertThat(listener.bulkFailures.size(), equalTo(0));
385-
assertResponseItems(listener.bulkItems, numDocs, localType);
386-
387-
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
388-
389-
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
390-
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
391-
}
392-
}
393-
{
394-
//Check that untyped document additions and untyped global inherit the established custom type
395-
// (the custom document type introduced to the mapping by the earlier code in this test)
396-
String globalType = null;
397-
String localType = null;
398-
final CountDownLatch latch = new CountDownLatch(1);
399-
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
400-
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
401-
//let's make sure that the bulk action limit trips, one single execution will index all the documents
402-
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
403-
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
404-
.setGlobalIndex("test")
405-
.setGlobalType(globalType)
406-
.setGlobalRouting("routing")
407-
.setGlobalPipeline("pipeline_id")
408-
.build()) {
409-
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
410-
latch.await();
411-
412-
assertThat(listener.beforeCounts.get(), equalTo(1));
413-
assertThat(listener.afterCounts.get(), equalTo(1));
414-
assertThat(listener.bulkFailures.size(), equalTo(0));
415-
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);
416-
417-
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
418-
419-
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
420-
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
421-
}
422-
}
423349
}
424350

425351
@SuppressWarnings("unchecked")
@@ -431,20 +357,15 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
431357
}
432358

433359
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
434-
String globalIndex, String globalType, String globalPipeline) throws Exception {
360+
String globalIndex, String globalPipeline) throws Exception {
435361
MultiGetRequest multiGetRequest = new MultiGetRequest();
436362
for (int i = 1; i <= numDocs; i++) {
437363
if (randomBoolean()) {
438364
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
439365
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
440366
} else {
441367
BytesArray data = bytesBulkRequest(localIndex, localType, i);
442-
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);
443-
444-
if (localType != null) {
445-
// If the payload contains types, parsing it into a bulk request results in a warning.
446-
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
447-
}
368+
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
448369
}
449370
multiGetRequest.add(localIndex, Integer.toString(i));
450371
}
@@ -475,19 +396,14 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
475396
}
476397

477398
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
478-
return indexDocs(processor, numDocs, "test", null, null, null, null);
399+
return indexDocs(processor, numDocs, "test", null, null, null);
479400
}
480401

481402
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
482-
assertResponseItems(bulkItemResponses, numDocs, MapperService.SINGLE_MAPPING_NAME);
483-
}
484-
485-
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs, String expectedType) {
486403
assertThat(bulkItemResponses.size(), is(numDocs));
487404
int i = 1;
488405
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
489406
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
490-
assertThat(bulkItemResponse.getType(), equalTo(expectedType));
491407
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
492408
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
493409
bulkItemResponse.isFailed(), equalTo(false));

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
106106
}
107107

108108
public void testGlobalIndex() throws IOException {
109-
BulkRequest request = new BulkRequest("global_index", null);
109+
BulkRequest request = new BulkRequest("global_index");
110110
request.add(new IndexRequest().id("1")
111111
.source(XContentType.JSON, "field", "bulk1"));
112112
request.add(new IndexRequest().id("2")
@@ -120,7 +120,7 @@ public void testGlobalIndex() throws IOException {
120120

121121
@SuppressWarnings("unchecked")
122122
public void testIndexGlobalAndPerRequest() throws IOException {
123-
BulkRequest request = new BulkRequest("global_index", null);
123+
BulkRequest request = new BulkRequest("global_index");
124124
request.add(new IndexRequest("local_index").id("1")
125125
.source(XContentType.JSON, "field", "bulk1"));
126126
request.add(new IndexRequest().id("2") // will take global index
@@ -168,19 +168,6 @@ public void testMixLocalAndGlobalRouting() throws IOException {
168168
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
169169
}
170170

171-
public void testGlobalIndexNoTypes() throws IOException {
172-
BulkRequest request = new BulkRequest("global_index");
173-
request.add(new IndexRequest().id("1")
174-
.source(XContentType.JSON, "field", "bulk1"));
175-
request.add(new IndexRequest().id("2")
176-
.source(XContentType.JSON, "field", "bulk2"));
177-
178-
bulk(request);
179-
180-
Iterable<SearchHit> hits = searchAll("global_index");
181-
assertThat(hits, everyItem(hasIndex("global_index")));
182-
}
183-
184171
private BulkResponse bulk(BulkRequest request) throws IOException {
185172
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync, RequestOptions.DEFAULT);
186173
assertFalse(bulkResponse.hasFailures());

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.index.VersionType;
5555
import org.elasticsearch.index.get.GetResult;
5656
import org.elasticsearch.rest.RestStatus;
57-
import org.elasticsearch.rest.action.document.RestBulkAction;
5857
import org.elasticsearch.rest.action.document.RestDeleteAction;
5958
import org.elasticsearch.rest.action.document.RestIndexAction;
6059
import org.elasticsearch.rest.action.document.RestUpdateAction;
@@ -401,20 +400,6 @@ public void testMultiGet() throws IOException {
401400
}
402401
}
403402

404-
public void testMultiGetWithTypes() throws IOException {
405-
BulkRequest bulk = new BulkRequest();
406-
bulk.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
407-
bulk.add(new IndexRequest("index", "type", "id1")
408-
.source("{\"field\":\"value1\"}", XContentType.JSON));
409-
bulk.add(new IndexRequest("index", "type", "id2")
410-
.source("{\"field\":\"value2\"}", XContentType.JSON));
411-
412-
highLevelClient().bulk(bulk, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
413-
MultiGetRequest multiGetRequest = new MultiGetRequest();
414-
multiGetRequest.add("index", "id1");
415-
multiGetRequest.add("index", "id2");
416-
}
417-
418403
public void testIndex() throws IOException {
419404
final XContentType xContentType = randomFrom(XContentType.values());
420405
{
@@ -897,7 +882,6 @@ private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse b
897882

898883
assertEquals(i, bulkItemResponse.getItemId());
899884
assertEquals("index", bulkItemResponse.getIndex());
900-
assertEquals("_doc", bulkItemResponse.getType());
901885
assertEquals(String.valueOf(i), bulkItemResponse.getId());
902886

903887
DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();

docs/build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,6 @@ buildRestTests.setups['library'] = '''
574574
- do:
575575
bulk:
576576
index: library
577-
type: book
578577
refresh: true
579578
body: |
580579
{"index":{"_id": "Leviathan Wakes"}}
@@ -923,7 +922,6 @@ buildRestTests.setups['farequote_data'] = buildRestTests.setups['farequote_index
923922
- do:
924923
bulk:
925924
index: farequote
926-
type: metric
927925
refresh: true
928926
body: |
929927
{"index": {"_id":"1"}}
@@ -983,7 +981,6 @@ buildRestTests.setups['server_metrics_data'] = buildRestTests.setups['server_met
983981
- do:
984982
bulk:
985983
index: server-metrics
986-
type: metric
987984
refresh: true
988985
body: |
989986
{"index": {"_id":"1177"}}

docs/reference/aggregations/bucket/rare-terms-aggregation.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ PUT /products
2525
}
2626
}
2727
28-
POST /products/_doc/_bulk?refresh
28+
POST /products/_bulk?refresh
2929
{"index":{"_id":0}}
3030
{"genre": "rock", "product": "Product A"}
3131
{"index":{"_id":1}}

docs/reference/sql/getting-started.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ an index with some data to experiment with:
88

99
[source,console]
1010
--------------------------------------------------
11-
PUT /library/book/_bulk?refresh
11+
PUT /library/_bulk?refresh
1212
{"index":{"_id": "Leviathan Wakes"}}
1313
{"name": "Leviathan Wakes", "author": "James S.A. Corey", "release_date": "2011-06-02", "page_count": 561}
1414
{"index":{"_id": "Hyperion"}}

modules/lang-mustache/src/test/resources/rest-api-spec/test/lang_mustache/60_typed_keys.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ setup:
2121
bulk:
2222
refresh: true
2323
body:
24-
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
24+
- '{"index": {"_index": "test-0"}}'
2525
- '{"ip": "10.0.0.1", "integer": 38, "float": 12.5713, "name": "Ruth", "bool": true}'
26-
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
26+
- '{"index": {"_index": "test-0"}}'
2727
- '{"ip": "10.0.0.2", "integer": 42, "float": 15.3393, "name": "Jackie", "surname": "Bowling", "bool": false}'
28-
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
28+
- '{"index": {"_index": "test-1"}}'
2929
- '{"ip": "10.0.0.3", "integer": 29, "float": 19.0517, "name": "Stephanie", "bool": true}'
30-
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
30+
- '{"index": {"_index": "test-1"}}'
3131
- '{"ip": "10.0.0.4", "integer": 19, "float": 19.3717, "surname": "Hamilton", "bool": true}'
32-
- '{"index": {"_index": "test-2", "_type": "_doc"}}'
32+
- '{"index": {"_index": "test-2"}}'
3333
- '{"ip": "10.0.0.5", "integer": 0, "float": 17.3349, "name": "Natalie", "bool": false}'
3434

3535
---

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
290290
if (rarely()) {
291291
versionConflicts++;
292292
responses[i] = new BulkItemResponse(i, randomFrom(DocWriteRequest.OpType.values()),
293-
new Failure(shardId.getIndexName(), "type", "id" + i,
293+
new Failure(shardId.getIndexName(), "id" + i,
294294
new VersionConflictEngineException(shardId, "id", "test")));
295295
continue;
296296
}
@@ -399,7 +399,7 @@ public void testSearchTimeoutsAbortRequest() throws Exception {
399399
* Mimicks bulk indexing failures.
400400
*/
401401
public void testBulkFailuresAbortRequest() throws Exception {
402-
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
402+
Failure failure = new Failure("index", "id", new RuntimeException("test"));
403403
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
404404
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
405405
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
@@ -902,7 +902,7 @@ void doExecute(ActionType<Response> action, Request request, ActionListener<Resp
902902
}
903903
if (i == toReject) {
904904
responses[i] = new BulkItemResponse(i, item.opType(),
905-
new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException()));
905+
new Failure(response.getIndex(), response.getId(), new EsRejectedExecutionException()));
906906
} else {
907907
responses[i] = new BulkItemResponse(i, item.opType(), response);
908908
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testMergeConstructor() {
5151
BulkByScrollTask.Status status = new BulkByScrollTask.Status(i, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f,
5252
thisReasonCancelled, timeValueMillis(0));
5353
List<BulkItemResponse.Failure> bulkFailures = frequently() ? emptyList()
54-
: IntStream.range(0, between(1, 3)).mapToObj(j -> new BulkItemResponse.Failure("idx", "type", "id", new Exception()))
54+
: IntStream.range(0, between(1, 3)).mapToObj(j -> new BulkItemResponse.Failure("idx", "id", new Exception()))
5555
.collect(Collectors.toList());
5656
allBulkFailures.addAll(bulkFailures);
5757
List<SearchFailure> searchFailures = frequently() ? emptyList()

0 commit comments

Comments
 (0)