Skip to content

Commit 61c21f9

Browse files
committed
Bulk API: Do not fail whole request on closed index
The bulk API request was marked as completely failed, in case a request with a closed index was referred in any of the requests inside of a bulk one. Implementation Note: Currently the implementation is a bit more verbose in order to prevent an instanceof check and another cast - if that is fast enough, we could execute that logic only once at the beginning of the loop (thinking this might be a bit overoptimization here). Closes #6410
1 parent 4f791b0 commit 61c21f9

File tree

6 files changed

+157
-41
lines changed

6 files changed

+157
-41
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action;
20+
21+
import org.elasticsearch.action.support.IndicesOptions;
22+
23+
/**
24+
* Generic interface to group ActionRequest, which work on single document level
25+
*
26+
* Forces this class return index/type/id getters
27+
*/
28+
public interface DocumentRequest<T> {
29+
30+
/**
31+
* Get the index that this request operates on
32+
* @return the index
33+
*/
34+
String index();
35+
36+
/**
37+
* Get the type that this request operates on
38+
* @return the type
39+
*/
40+
String type();
41+
42+
/**
43+
* Get the id of the document for this request
44+
* @return the id
45+
*/
46+
String id();
47+
48+
/**
49+
* Get the options for this request
50+
* @return the indices options
51+
*/
52+
IndicesOptions indicesOptions();
53+
54+
/**
55+
* Set the routing for this request
56+
* @param routing
57+
* @return the Request
58+
*/
59+
T routing(String routing);
60+
61+
/**
62+
* Get the routing for this request
63+
* @return the Routing
64+
*/
65+
String routing();
66+
}

src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.ExceptionsHelper;
2828
import org.elasticsearch.action.ActionListener;
2929
import org.elasticsearch.action.ActionRequest;
30+
import org.elasticsearch.action.DocumentRequest;
3031
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
3132
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3233
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -40,15 +41,18 @@
4041
import org.elasticsearch.cluster.ClusterService;
4142
import org.elasticsearch.cluster.ClusterState;
4243
import org.elasticsearch.cluster.block.ClusterBlockLevel;
44+
import org.elasticsearch.cluster.metadata.IndexMetaData;
4345
import org.elasticsearch.cluster.metadata.MappingMetaData;
4446
import org.elasticsearch.cluster.metadata.MetaData;
4547
import org.elasticsearch.cluster.routing.GroupShardsIterator;
4648
import org.elasticsearch.cluster.routing.ShardIterator;
4749
import org.elasticsearch.common.inject.Inject;
4850
import org.elasticsearch.common.settings.Settings;
4951
import org.elasticsearch.common.util.concurrent.AtomicArray;
52+
import org.elasticsearch.index.Index;
5053
import org.elasticsearch.index.shard.ShardId;
5154
import org.elasticsearch.indices.IndexAlreadyExistsException;
55+
import org.elasticsearch.indices.IndexClosedException;
5256
import org.elasticsearch.rest.RestStatus;
5357
import org.elasticsearch.threadpool.ThreadPool;
5458
import org.elasticsearch.transport.TransportService;
@@ -96,26 +100,15 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul
96100
if (autoCreateIndex.needToCheck()) {
97101
final Set<String> indices = Sets.newHashSet();
98102
for (ActionRequest request : bulkRequest.requests) {
99-
if (request instanceof IndexRequest) {
100-
IndexRequest indexRequest = (IndexRequest) request;
101-
if (!indices.contains(indexRequest.index())) {
102-
indices.add(indexRequest.index());
103-
}
104-
} else if (request instanceof DeleteRequest) {
105-
DeleteRequest deleteRequest = (DeleteRequest) request;
106-
if (!indices.contains(deleteRequest.index())) {
107-
indices.add(deleteRequest.index());
108-
}
109-
} else if (request instanceof UpdateRequest) {
110-
UpdateRequest updateRequest = (UpdateRequest) request;
111-
if (!indices.contains(updateRequest.index())) {
112-
indices.add(updateRequest.index());
103+
if (request instanceof DocumentRequest) {
104+
DocumentRequest req = (DocumentRequest) request;
105+
if (!indices.contains(req.index())) {
106+
indices.add(req.index());
113107
}
114108
} else {
115109
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
116110
}
117111
}
118-
119112
final AtomicInteger counter = new AtomicInteger(indices.size());
120113
ClusterState state = clusterService.state();
121114
for (final String index : indices) {
@@ -204,30 +197,33 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
204197
MetaData metaData = clusterState.metaData();
205198
for (int i = 0; i < bulkRequest.requests.size(); i++) {
206199
ActionRequest request = bulkRequest.requests.get(i);
207-
if (request instanceof IndexRequest) {
208-
IndexRequest indexRequest = (IndexRequest) request;
209-
String concreteIndex = concreteIndices.resolveIfAbsent(indexRequest.index(), indexRequest.indicesOptions());
210-
MappingMetaData mappingMd = null;
211-
if (metaData.hasIndex(concreteIndex)) {
212-
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
200+
if (request instanceof DocumentRequest) {
201+
DocumentRequest req = (DocumentRequest) request;
202+
203+
if (addFailureIfIndexIsClosed(req, bulkRequest, responses, i, concreteIndices, metaData)) {
204+
continue;
213205
}
214-
try {
215-
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
216-
} catch (ElasticsearchParseException e) {
217-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
218-
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
219-
responses.set(i, bulkItemResponse);
220-
// make sure the request gets never processed again
221-
bulkRequest.requests.set(i, null);
206+
207+
String concreteIndex = concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
208+
if (request instanceof IndexRequest) {
209+
IndexRequest indexRequest = (IndexRequest) request;
210+
MappingMetaData mappingMd = null;
211+
if (metaData.hasIndex(concreteIndex)) {
212+
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
213+
}
214+
try {
215+
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
216+
} catch (ElasticsearchParseException e) {
217+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
218+
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
219+
responses.set(i, bulkItemResponse);
220+
// make sure the request gets never processed again
221+
bulkRequest.requests.set(i, null);
222+
}
223+
} else {
224+
concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
225+
req.routing(clusterState.metaData().resolveIndexRouting(req.routing(), req.index()));
222226
}
223-
} else if (request instanceof DeleteRequest) {
224-
DeleteRequest deleteRequest = (DeleteRequest) request;
225-
concreteIndices.resolveIfAbsent(deleteRequest.index(), deleteRequest.indicesOptions());
226-
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
227-
} else if (request instanceof UpdateRequest) {
228-
UpdateRequest updateRequest = (UpdateRequest) request;
229-
concreteIndices.resolveIfAbsent(updateRequest.index(), updateRequest.indicesOptions());
230-
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
231227
}
232228
}
233229

@@ -343,8 +339,35 @@ private void finishHim() {
343339
}
344340
}
345341

346-
private static class ConcreteIndices {
342+
private boolean addFailureIfIndexIsClosed(DocumentRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
343+
final ConcreteIndices concreteIndices,
344+
final MetaData metaData) {
345+
String concreteIndex = concreteIndices.getConcreteIndex(request.index());
346+
boolean isClosed = false;
347+
if (concreteIndex == null) {
348+
try {
349+
concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions());
350+
} catch (IndexClosedException ice) {
351+
isClosed = true;
352+
}
353+
}
354+
if (!isClosed) {
355+
IndexMetaData indexMetaData = metaData.index(concreteIndex);
356+
isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE;
357+
}
358+
if (isClosed) {
359+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
360+
new IndexClosedException(new Index(metaData.index(request.index()).getIndex())));
361+
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure);
362+
responses.set(idx, bulkItemResponse);
363+
// make sure the request gets never processed again
364+
bulkRequest.requests.set(idx, null);
365+
}
366+
return isClosed;
367+
}
368+
347369

370+
private static class ConcreteIndices {
348371
private final Map<String, String> indices = new HashMap<>();
349372
private final MetaData metaData;
350373

src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.ActionRequest;
2323
import org.elasticsearch.action.ActionRequestValidationException;
24+
import org.elasticsearch.action.DocumentRequest;
2425
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.io.stream.StreamInput;
@@ -43,7 +44,7 @@
4344
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
4445
* @see org.elasticsearch.client.Requests#deleteRequest(String)
4546
*/
46-
public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> {
47+
public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
4748

4849
private String type;
4950
private String id;

src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.ActionRequestValidationException;
2626
import org.elasticsearch.action.RoutingMissingException;
2727
import org.elasticsearch.action.TimestampParsingException;
28+
import org.elasticsearch.action.DocumentRequest;
2829
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
2930
import org.elasticsearch.client.Requests;
3031
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -64,7 +65,7 @@
6465
* @see org.elasticsearch.client.Requests#indexRequest(String)
6566
* @see org.elasticsearch.client.Client#index(IndexRequest)
6667
*/
67-
public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> {
68+
public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
6869

6970
/**
7071
* Operation type controls if the type of the index operation.

src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.Maps;
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.action.ActionRequestValidationException;
25+
import org.elasticsearch.action.DocumentRequest;
2526
import org.elasticsearch.action.WriteConsistencyLevel;
2627
import org.elasticsearch.action.index.IndexRequest;
2728
import org.elasticsearch.action.support.replication.ReplicationType;
@@ -47,7 +48,7 @@
4748

4849
/**
4950
*/
50-
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> {
51+
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> implements DocumentRequest<UpdateRequest> {
5152

5253
private String type;
5354
private String id;

src/test/java/org/elasticsearch/document/BulkTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.base.Charsets;
2323
import org.elasticsearch.action.admin.indices.alias.Alias;
2424
import org.elasticsearch.action.bulk.BulkItemResponse;
25+
import org.elasticsearch.action.bulk.BulkRequest;
2526
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2627
import org.elasticsearch.action.bulk.BulkResponse;
2728
import org.elasticsearch.action.count.CountResponse;
@@ -651,8 +652,31 @@ public void testThatFailedUpdateRequestReturnsCorrectType() throws Exception {
651652
assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete"));
652653
}
653654

655+
654656
private static String indexOrAlias() {
655657
return randomBoolean() ? "test" : "alias";
656658
}
659+
660+
@Test // issue 6410
661+
public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{
662+
createIndex("bulkindex1", "bulkindex2");
663+
BulkRequest bulkRequest = new BulkRequest();
664+
bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1"))
665+
.add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2"))
666+
.add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2"))
667+
.add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar"))
668+
.add(new DeleteRequest("bulkindex2", "index2_type", "3"))
669+
.refresh(true);
670+
671+
client().bulk(bulkRequest).get();
672+
SearchResponse searchResponse = client().prepareSearch("bulkindex*").get();
673+
assertHitCount(searchResponse, 3);
674+
675+
assertAcked(client().admin().indices().prepareClose("bulkindex2"));
676+
677+
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
678+
assertThat(bulkResponse.hasFailures(), is(true));
679+
assertThat(bulkResponse.getItems().length, is(5));
680+
}
657681
}
658682

0 commit comments

Comments
 (0)