Skip to content

Commit 871e333

Browse files
committed
Bulk api: fail deletes when routing is required but not specified
As part of elastic#10136 we removed the transport action for broadcast deletes in case routing is required but not specified. Bulk api worked differently though and kept on doing the broadcast delete internally in that case. This commit makes sure that delete items are marked as failed in such cases. Also the check has been moved up in the code together with the existing check for the update api, and we now make sure that the exception is the same as the one thrown for single document apis (delete/update). Note that the failure for the update api contained the wrong optype (the type of the document rather than "update"), that's been fixed too and tested. Closes elastic#16645
1 parent 4b20845 commit 871e333

File tree

5 files changed

+165
-88
lines changed

5 files changed

+165
-88
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3232
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
3333
import org.elasticsearch.action.delete.DeleteRequest;
34+
import org.elasticsearch.action.delete.TransportDeleteAction;
3435
import org.elasticsearch.action.index.IndexRequest;
3536
import org.elasticsearch.action.support.ActionFilters;
3637
import org.elasticsearch.action.support.AutoCreateIndex;
3738
import org.elasticsearch.action.support.HandledTransportAction;
39+
import org.elasticsearch.action.update.TransportUpdateAction;
3840
import org.elasticsearch.action.update.UpdateRequest;
3941
import org.elasticsearch.cluster.ClusterService;
4042
import org.elasticsearch.cluster.ClusterState;
@@ -43,8 +45,6 @@
4345
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4446
import org.elasticsearch.cluster.metadata.MappingMetaData;
4547
import org.elasticsearch.cluster.metadata.MetaData;
46-
import org.elasticsearch.cluster.routing.GroupShardsIterator;
47-
import org.elasticsearch.cluster.routing.ShardIterator;
4848
import org.elasticsearch.common.inject.Inject;
4949
import org.elasticsearch.common.settings.Settings;
5050
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -201,7 +201,7 @@ public void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bulk
201201
executeBulk(bulkRequest, startTime, listener, new AtomicArray<BulkItemResponse>(bulkRequest.requests.size()));
202202
}
203203

204-
private final long buildTookInMillis(long startTime) {
204+
private long buildTookInMillis(long startTime) {
205205
// protect ourselves against time going backwards
206206
return Math.max(1, System.currentTimeMillis() - startTime);
207207
}
@@ -215,33 +215,52 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
215215
MetaData metaData = clusterState.metaData();
216216
for (int i = 0; i < bulkRequest.requests.size(); i++) {
217217
ActionRequest request = bulkRequest.requests.get(i);
218-
if (request instanceof DocumentRequest) {
219-
DocumentRequest req = (DocumentRequest) request;
220-
221-
if (addFailureIfIndexIsUnavailable(req, bulkRequest, responses, i, concreteIndices, metaData)) {
222-
continue;
218+
//the request can only be null because we set it to null in the previous step, so it gets ignored
219+
if (request == null) {
220+
continue;
221+
}
222+
DocumentRequest documentRequest = (DocumentRequest) request;
223+
if (addFailureIfIndexIsUnavailable(documentRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
224+
continue;
225+
}
226+
String concreteIndex = concreteIndices.resolveIfAbsent(documentRequest);
227+
if (request instanceof IndexRequest) {
228+
IndexRequest indexRequest = (IndexRequest) request;
229+
MappingMetaData mappingMd = null;
230+
if (metaData.hasIndex(concreteIndex)) {
231+
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
223232
}
224-
225-
String concreteIndex = concreteIndices.resolveIfAbsent(req);
226-
if (request instanceof IndexRequest) {
227-
IndexRequest indexRequest = (IndexRequest) request;
228-
MappingMetaData mappingMd = null;
229-
if (metaData.hasIndex(concreteIndex)) {
230-
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
231-
}
232-
try {
233-
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
234-
} catch (ElasticsearchParseException | RoutingMissingException e) {
235-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
236-
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
237-
responses.set(i, bulkItemResponse);
238-
// make sure the request gets never processed again
239-
bulkRequest.requests.set(i, null);
240-
}
241-
} else {
242-
concreteIndices.resolveIfAbsent(req);
243-
req.routing(clusterState.metaData().resolveIndexRouting(req.routing(), req.index()));
233+
try {
234+
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
235+
} catch (ElasticsearchParseException | RoutingMissingException e) {
236+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
237+
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
238+
responses.set(i, bulkItemResponse);
239+
// make sure the request gets never processed again
240+
bulkRequest.requests.set(i, null);
244241
}
242+
} else if (request instanceof DeleteRequest) {
243+
try {
244+
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex, (DeleteRequest)request);
245+
} catch(RoutingMissingException e) {
246+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, documentRequest.type(), documentRequest.id(), e);
247+
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "delete", failure);
248+
responses.set(i, bulkItemResponse);
249+
// make sure the request gets never processed again
250+
bulkRequest.requests.set(i, null);
251+
}
252+
} else if (request instanceof UpdateRequest) {
253+
try {
254+
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex, (UpdateRequest)request);
255+
} catch(RoutingMissingException e) {
256+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, documentRequest.type(), documentRequest.id(), e);
257+
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "update", failure);
258+
responses.set(i, bulkItemResponse);
259+
// make sure the request gets never processed again
260+
bulkRequest.requests.set(i, null);
261+
}
262+
} else {
263+
throw new AssertionError("request type not supported: [" + request.getClass().getName() + "]");
245264
}
246265
}
247266

@@ -263,37 +282,16 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
263282
} else if (request instanceof DeleteRequest) {
264283
DeleteRequest deleteRequest = (DeleteRequest) request;
265284
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index());
266-
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(deleteRequest.type());
267-
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
268-
// if routing is required, and no routing on the delete request, we need to broadcast it....
269-
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, concreteIndex);
270-
for (ShardIterator shardIt : groupShards) {
271-
List<BulkItemRequest> list = requestsByShard.get(shardIt.shardId());
272-
if (list == null) {
273-
list = new ArrayList<>();
274-
requestsByShard.put(shardIt.shardId(), list);
275-
}
276-
list.add(new BulkItemRequest(i, new DeleteRequest(deleteRequest)));
277-
}
278-
} else {
279-
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
280-
List<BulkItemRequest> list = requestsByShard.get(shardId);
281-
if (list == null) {
282-
list = new ArrayList<>();
283-
requestsByShard.put(shardId, list);
284-
}
285-
list.add(new BulkItemRequest(i, request));
285+
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
286+
List<BulkItemRequest> list = requestsByShard.get(shardId);
287+
if (list == null) {
288+
list = new ArrayList<>();
289+
requestsByShard.put(shardId, list);
286290
}
291+
list.add(new BulkItemRequest(i, request));
287292
} else if (request instanceof UpdateRequest) {
288293
UpdateRequest updateRequest = (UpdateRequest) request;
289294
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
290-
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type());
291-
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
292-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(),
293-
updateRequest.id(), new IllegalArgumentException("routing is required for this item"));
294-
responses.set(i, new BulkItemResponse(i, updateRequest.type(), failure));
295-
continue;
296-
}
297295
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
298296
List<BulkItemRequest> list = requestsByShard.get(shardId);
299297
if (list == null) {

core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,23 +95,28 @@ public void onFailure(Throwable e) {
9595

9696
@Override
9797
protected void resolveRequest(final MetaData metaData, String concreteIndex, DeleteRequest request) {
98+
resolveAndValidateRouting(metaData, concreteIndex, request);
99+
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.type(),
100+
request.id(), request.routing());
101+
request.setShardId(shardId);
102+
}
103+
104+
public static void resolveAndValidateRouting(final MetaData metaData, String concreteIndex, DeleteRequest request) {
98105
request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
99106
if (metaData.hasIndex(concreteIndex)) {
100-
// check if routing is required, if so, do a broadcast delete
107+
// check if routing is required, if so, throw error if routing wasn't specified
101108
MappingMetaData mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
102109
if (mappingMd != null && mappingMd.routing().required()) {
103110
if (request.routing() == null) {
104111
if (request.versionType() != VersionType.INTERNAL) {
105112
// TODO: implement this feature
106113
throw new IllegalArgumentException("routing value is required for deleting documents of type [" + request.type()
107-
+ "] while using version_type [" + request.versionType() + "]");
114+
+ "] while using version_type [" + request.versionType() + "]");
108115
}
109116
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
110117
}
111118
}
112119
}
113-
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.type(), request.id(), request.routing());
114-
request.setShardId(shardId);
115120
}
116121

117122
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {

core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.cluster.ClusterService;
4040
import org.elasticsearch.cluster.ClusterState;
4141
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
42+
import org.elasticsearch.cluster.metadata.MetaData;
4243
import org.elasticsearch.cluster.routing.PlainShardIterator;
4344
import org.elasticsearch.cluster.routing.ShardIterator;
4445
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -102,12 +103,16 @@ protected boolean retryOnFailure(Throwable e) {
102103

103104
@Override
104105
protected boolean resolveRequest(ClusterState state, UpdateRequest request, ActionListener<UpdateResponse> listener) {
105-
request.routing((state.metaData().resolveIndexRouting(request.routing(), request.index())));
106+
resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request);
107+
return true;
108+
}
109+
110+
public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
111+
request.routing((metaData.resolveIndexRouting(request.routing(), request.index())));
106112
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
107-
if (request.routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.type())) {
108-
throw new RoutingMissingException(request.concreteIndex(), request.type(), request.id());
113+
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
114+
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
109115
}
110-
return true;
111116
}
112117

113118
@Override

core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,6 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar
6969
return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
7070
}
7171

72-
public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) {
73-
return indexRoutingTable(clusterState, index).groupByShardsIt();
74-
}
75-
7672
public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing) {
7773
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
7874
return shards.size();

0 commit comments

Comments
 (0)