Skip to content

Commit 4f791b0

Browse files
committed
Revert "Bulk API: Do not fail whole request on closed index"
This reverts commit 405e581.
1 parent c7c61bf commit 4f791b0

File tree

5 files changed

+41
-91
lines changed

5 files changed

+41
-91
lines changed

src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 38 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.ExceptionsHelper;
2828
import org.elasticsearch.action.ActionListener;
2929
import org.elasticsearch.action.ActionRequest;
30-
import org.elasticsearch.action.DocumentRequest;
3130
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
3231
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3332
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -41,18 +40,15 @@
4140
import org.elasticsearch.cluster.ClusterService;
4241
import org.elasticsearch.cluster.ClusterState;
4342
import org.elasticsearch.cluster.block.ClusterBlockLevel;
44-
import org.elasticsearch.cluster.metadata.IndexMetaData;
4543
import org.elasticsearch.cluster.metadata.MappingMetaData;
4644
import org.elasticsearch.cluster.metadata.MetaData;
4745
import org.elasticsearch.cluster.routing.GroupShardsIterator;
4846
import org.elasticsearch.cluster.routing.ShardIterator;
4947
import org.elasticsearch.common.inject.Inject;
5048
import org.elasticsearch.common.settings.Settings;
5149
import org.elasticsearch.common.util.concurrent.AtomicArray;
52-
import org.elasticsearch.index.Index;
5350
import org.elasticsearch.index.shard.ShardId;
5451
import org.elasticsearch.indices.IndexAlreadyExistsException;
55-
import org.elasticsearch.indices.IndexClosedException;
5652
import org.elasticsearch.rest.RestStatus;
5753
import org.elasticsearch.threadpool.ThreadPool;
5854
import org.elasticsearch.transport.TransportService;
@@ -100,15 +96,26 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul
10096
if (autoCreateIndex.needToCheck()) {
10197
final Set<String> indices = Sets.newHashSet();
10298
for (ActionRequest request : bulkRequest.requests) {
103-
if (request instanceof DocumentRequest) {
104-
DocumentRequest req = (DocumentRequest) request;
105-
if (!indices.contains(req.index())) {
106-
indices.add(req.index());
99+
if (request instanceof IndexRequest) {
100+
IndexRequest indexRequest = (IndexRequest) request;
101+
if (!indices.contains(indexRequest.index())) {
102+
indices.add(indexRequest.index());
103+
}
104+
} else if (request instanceof DeleteRequest) {
105+
DeleteRequest deleteRequest = (DeleteRequest) request;
106+
if (!indices.contains(deleteRequest.index())) {
107+
indices.add(deleteRequest.index());
108+
}
109+
} else if (request instanceof UpdateRequest) {
110+
UpdateRequest updateRequest = (UpdateRequest) request;
111+
if (!indices.contains(updateRequest.index())) {
112+
indices.add(updateRequest.index());
107113
}
108114
} else {
109115
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
110116
}
111117
}
118+
112119
final AtomicInteger counter = new AtomicInteger(indices.size());
113120
ClusterState state = clusterService.state();
114121
for (final String index : indices) {
@@ -197,33 +204,30 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
197204
MetaData metaData = clusterState.metaData();
198205
for (int i = 0; i < bulkRequest.requests.size(); i++) {
199206
ActionRequest request = bulkRequest.requests.get(i);
200-
if (request instanceof DocumentRequest) {
201-
DocumentRequest req = (DocumentRequest) request;
202-
203-
if (addFailureIfIndexIsClosed(req, bulkRequest, responses, i, concreteIndices, metaData)) {
204-
continue;
207+
if (request instanceof IndexRequest) {
208+
IndexRequest indexRequest = (IndexRequest) request;
209+
String concreteIndex = concreteIndices.resolveIfAbsent(indexRequest.index(), indexRequest.indicesOptions());
210+
MappingMetaData mappingMd = null;
211+
if (metaData.hasIndex(concreteIndex)) {
212+
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
205213
}
206-
207-
String concreteIndex = concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
208-
if (request instanceof IndexRequest) {
209-
IndexRequest indexRequest = (IndexRequest) request;
210-
MappingMetaData mappingMd = null;
211-
if (metaData.hasIndex(concreteIndex)) {
212-
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
213-
}
214-
try {
215-
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
216-
} catch (ElasticsearchParseException e) {
217-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
218-
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
219-
responses.set(i, bulkItemResponse);
220-
// make sure the request gets never processed again
221-
bulkRequest.requests.set(i, null);
222-
}
223-
} else {
224-
concreteIndices.resolveIfAbsent(req.index(), req.indicesOptions());
225-
req.routing(clusterState.metaData().resolveIndexRouting(req.routing(), req.index()));
214+
try {
215+
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
216+
} catch (ElasticsearchParseException e) {
217+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
218+
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
219+
responses.set(i, bulkItemResponse);
220+
// make sure the request gets never processed again
221+
bulkRequest.requests.set(i, null);
226222
}
223+
} else if (request instanceof DeleteRequest) {
224+
DeleteRequest deleteRequest = (DeleteRequest) request;
225+
concreteIndices.resolveIfAbsent(deleteRequest.index(), deleteRequest.indicesOptions());
226+
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
227+
} else if (request instanceof UpdateRequest) {
228+
UpdateRequest updateRequest = (UpdateRequest) request;
229+
concreteIndices.resolveIfAbsent(updateRequest.index(), updateRequest.indicesOptions());
230+
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
227231
}
228232
}
229233

@@ -339,35 +343,8 @@ private void finishHim() {
339343
}
340344
}
341345

342-
private boolean addFailureIfIndexIsClosed(DocumentRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
343-
final ConcreteIndices concreteIndices,
344-
final MetaData metaData) {
345-
String concreteIndex = concreteIndices.getConcreteIndex(request.index());
346-
boolean isClosed = false;
347-
if (concreteIndex == null) {
348-
try {
349-
concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions());
350-
} catch (IndexClosedException ice) {
351-
isClosed = true;
352-
}
353-
}
354-
if (!isClosed) {
355-
IndexMetaData indexMetaData = metaData.index(concreteIndex);
356-
isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE;
357-
}
358-
if (isClosed) {
359-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
360-
new IndexClosedException(new Index(metaData.index(request.index()).getIndex())));
361-
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure);
362-
responses.set(idx, bulkItemResponse);
363-
// make sure the request gets never processed again
364-
bulkRequest.requests.set(idx, null);
365-
}
366-
return isClosed;
367-
}
368-
369-
370346
private static class ConcreteIndices {
347+
371348
private final Map<String, String> indices = new HashMap<>();
372349
private final MetaData metaData;
373350

src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.elasticsearch.action.ActionRequest;
2323
import org.elasticsearch.action.ActionRequestValidationException;
24-
import org.elasticsearch.action.DocumentRequest;
2524
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
2625
import org.elasticsearch.common.Nullable;
2726
import org.elasticsearch.common.io.stream.StreamInput;
@@ -44,7 +43,7 @@
4443
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
4544
* @see org.elasticsearch.client.Requests#deleteRequest(String)
4645
*/
47-
public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
46+
public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> {
4847

4948
private String type;
5049
private String id;

src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.ActionRequestValidationException;
2626
import org.elasticsearch.action.RoutingMissingException;
2727
import org.elasticsearch.action.TimestampParsingException;
28-
import org.elasticsearch.action.DocumentRequest;
2928
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
3029
import org.elasticsearch.client.Requests;
3130
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -65,7 +64,7 @@
6564
* @see org.elasticsearch.client.Requests#indexRequest(String)
6665
* @see org.elasticsearch.client.Client#index(IndexRequest)
6766
*/
68-
public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
67+
public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> {
6968

7069
/**
7170
* Operation type controls if the type of the index operation.

src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.collect.Maps;
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.action.ActionRequestValidationException;
25-
import org.elasticsearch.action.DocumentRequest;
2625
import org.elasticsearch.action.WriteConsistencyLevel;
2726
import org.elasticsearch.action.index.IndexRequest;
2827
import org.elasticsearch.action.support.replication.ReplicationType;
@@ -48,7 +47,7 @@
4847

4948
/**
5049
*/
51-
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> implements DocumentRequest<UpdateRequest> {
50+
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest> {
5251

5352
private String type;
5453
private String id;

src/test/java/org/elasticsearch/document/BulkTests.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.base.Charsets;
2323
import org.elasticsearch.action.admin.indices.alias.Alias;
2424
import org.elasticsearch.action.bulk.BulkItemResponse;
25-
import org.elasticsearch.action.bulk.BulkRequest;
2625
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2726
import org.elasticsearch.action.bulk.BulkResponse;
2827
import org.elasticsearch.action.count.CountResponse;
@@ -652,31 +651,8 @@ public void testThatFailedUpdateRequestReturnsCorrectType() throws Exception {
652651
assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete"));
653652
}
654653

655-
656654
private static String indexOrAlias() {
657655
return randomBoolean() ? "test" : "alias";
658656
}
659-
660-
@Test // issue 6410
661-
public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{
662-
createIndex("bulkindex1", "bulkindex2");
663-
BulkRequest bulkRequest = new BulkRequest();
664-
bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1"))
665-
.add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2"))
666-
.add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2"))
667-
.add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar"))
668-
.add(new DeleteRequest("bulkindex2", "index2_type", "3"))
669-
.refresh(true);
670-
671-
client().bulk(bulkRequest).get();
672-
SearchResponse searchResponse = client().prepareSearch("bulkindex*").get();
673-
assertHitCount(searchResponse, 3);
674-
675-
assertAcked(client().admin().indices().prepareClose("bulkindex2"));
676-
677-
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
678-
assertThat(bulkResponse.hasFailures(), is(true));
679-
assertThat(bulkResponse.getItems().length, is(5));
680-
}
681657
}
682658

0 commit comments

Comments
 (0)