Skip to content

Commit 2b2bd8f

Browse files
ignore 409 conflict in reindex responses (#39543)
The reindex family of APIs (reindex, update-by-query, delete-by-query) can sometimes return responses that have an error status code (409 Conflict in this case) but still have a body in the usual BulkByScrollResponse format. When the HLRC tries to handle such responses, it blows up because it tris to parse it expecting the error format that errors in general use. This change prompts the HLRC to parse the response using the expected BulkByScrollResponse format.
1 parent 8c2beca commit 2b2bd8f

File tree

3 files changed

+365
-249
lines changed

3 files changed

+365
-249
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Act
511511
*/
512512
public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
513513
return performRequestAndParseEntity(
514-
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
514+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, singleton(409)
515515
);
516516
}
517517

@@ -537,7 +537,7 @@ public final TaskSubmissionResponse submitReindexTask(ReindexRequest reindexRequ
537537
*/
538538
public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
539539
performRequestAsyncAndParseEntity(
540-
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
540+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
541541
);
542542
}
543543

@@ -551,7 +551,7 @@ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions opt
551551
*/
552552
public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException {
553553
return performRequestAndParseEntity(
554-
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
554+
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, singleton(409)
555555
);
556556
}
557557

@@ -566,7 +566,7 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
566566
public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
567567
ActionListener<BulkByScrollResponse> listener) {
568568
performRequestAsyncAndParseEntity(
569-
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
569+
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
570570
);
571571
}
572572

@@ -580,7 +580,7 @@ public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest,
580580
*/
581581
public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException {
582582
return performRequestAndParseEntity(
583-
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
583+
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, singleton(409)
584584
);
585585
}
586586

@@ -595,7 +595,7 @@ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQue
595595
public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
596596
ActionListener<BulkByScrollResponse> listener) {
597597
performRequestAsyncAndParseEntity(
598-
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
598+
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
599599
);
600600
}
601601

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 0 additions & 243 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,8 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchStatusException;
24-
import org.elasticsearch.action.ActionListener;
2524
import org.elasticsearch.action.DocWriteRequest;
2625
import org.elasticsearch.action.DocWriteResponse;
27-
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
28-
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
29-
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
3026
import org.elasticsearch.action.bulk.BulkItemResponse;
3127
import org.elasticsearch.action.bulk.BulkProcessor;
3228
import org.elasticsearch.action.bulk.BulkRequest;
@@ -39,7 +35,6 @@
3935
import org.elasticsearch.action.get.MultiGetResponse;
4036
import org.elasticsearch.action.index.IndexRequest;
4137
import org.elasticsearch.action.index.IndexResponse;
42-
import org.elasticsearch.action.search.SearchRequest;
4338
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
4439
import org.elasticsearch.action.update.UpdateRequest;
4540
import org.elasticsearch.action.update.UpdateResponse;
@@ -58,12 +53,6 @@
5853
import org.elasticsearch.common.xcontent.XContentType;
5954
import org.elasticsearch.index.VersionType;
6055
import org.elasticsearch.index.get.GetResult;
61-
import org.elasticsearch.index.query.IdsQueryBuilder;
62-
import org.elasticsearch.index.reindex.BulkByScrollResponse;
63-
import org.elasticsearch.index.reindex.DeleteByQueryAction;
64-
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
65-
import org.elasticsearch.index.reindex.UpdateByQueryAction;
66-
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
6756
import org.elasticsearch.rest.RestStatus;
6857
import org.elasticsearch.rest.action.document.RestBulkAction;
6958
import org.elasticsearch.rest.action.document.RestDeleteAction;
@@ -74,8 +63,6 @@
7463
import org.elasticsearch.script.Script;
7564
import org.elasticsearch.script.ScriptType;
7665
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
77-
import org.elasticsearch.tasks.RawTaskStatus;
78-
import org.elasticsearch.tasks.TaskId;
7966
import org.joda.time.DateTime;
8067
import org.joda.time.DateTimeZone;
8168
import org.joda.time.format.DateTimeFormat;
@@ -85,18 +72,12 @@
8572
import java.util.Collections;
8673
import java.util.List;
8774
import java.util.Map;
88-
import java.util.concurrent.CountDownLatch;
89-
import java.util.concurrent.TimeUnit;
9075
import java.util.concurrent.atomic.AtomicReference;
9176

9277
import static java.util.Collections.singletonMap;
9378
import static org.hamcrest.Matchers.containsString;
94-
import static org.hamcrest.Matchers.empty;
9579
import static org.hamcrest.Matchers.equalTo;
9680
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
97-
import static org.hamcrest.Matchers.hasSize;
98-
import static org.hamcrest.Matchers.instanceOf;
99-
import static org.hamcrest.Matchers.lessThan;
10081

10182
public class CrudIT extends ESRestHighLevelClientTestCase {
10283

@@ -857,230 +838,6 @@ public void testBulk() throws IOException {
857838
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
858839
}
859840

860-
private TaskId findTaskToRethrottle(String actionName) throws IOException {
861-
long start = System.nanoTime();
862-
ListTasksRequest request = new ListTasksRequest();
863-
request.setActions(actionName);
864-
request.setDetailed(true);
865-
do {
866-
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
867-
list.rethrowFailures("Finding tasks to rethrottle");
868-
assertThat("tasks are left over from the last execution of this test",
869-
list.getTaskGroups(), hasSize(lessThan(2)));
870-
if (0 == list.getTaskGroups().size()) {
871-
// The parent task hasn't started yet
872-
continue;
873-
}
874-
TaskGroup taskGroup = list.getTaskGroups().get(0);
875-
assertThat(taskGroup.getChildTasks(), empty());
876-
return taskGroup.getTaskInfo().getTaskId();
877-
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
878-
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
879-
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
880-
}
881-
882-
public void testUpdateByQuery() throws Exception {
883-
final String sourceIndex = "source1";
884-
{
885-
// Prepare
886-
Settings settings = Settings.builder()
887-
.put("number_of_shards", 1)
888-
.put("number_of_replicas", 0)
889-
.build();
890-
createIndex(sourceIndex, settings);
891-
assertEquals(
892-
RestStatus.OK,
893-
highLevelClient().bulk(
894-
new BulkRequest()
895-
.add(new IndexRequest(sourceIndex).id("1")
896-
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
897-
.add(new IndexRequest(sourceIndex).id("2")
898-
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
899-
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
900-
RequestOptions.DEFAULT
901-
).status()
902-
);
903-
}
904-
{
905-
// test1: create one doc in dest
906-
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
907-
updateByQueryRequest.indices(sourceIndex);
908-
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
909-
updateByQueryRequest.setRefresh(true);
910-
BulkByScrollResponse bulkResponse =
911-
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
912-
assertEquals(1, bulkResponse.getTotal());
913-
assertEquals(1, bulkResponse.getUpdated());
914-
assertEquals(0, bulkResponse.getNoops());
915-
assertEquals(0, bulkResponse.getVersionConflicts());
916-
assertEquals(1, bulkResponse.getBatches());
917-
assertTrue(bulkResponse.getTook().getMillis() > 0);
918-
assertEquals(1, bulkResponse.getBatches());
919-
assertEquals(0, bulkResponse.getBulkFailures().size());
920-
assertEquals(0, bulkResponse.getSearchFailures().size());
921-
}
922-
{
923-
// test2: update using script
924-
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
925-
updateByQueryRequest.indices(sourceIndex);
926-
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
927-
updateByQueryRequest.setRefresh(true);
928-
BulkByScrollResponse bulkResponse =
929-
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
930-
assertEquals(2, bulkResponse.getTotal());
931-
assertEquals(2, bulkResponse.getUpdated());
932-
assertEquals(0, bulkResponse.getDeleted());
933-
assertEquals(0, bulkResponse.getNoops());
934-
assertEquals(0, bulkResponse.getVersionConflicts());
935-
assertEquals(1, bulkResponse.getBatches());
936-
assertTrue(bulkResponse.getTook().getMillis() > 0);
937-
assertEquals(1, bulkResponse.getBatches());
938-
assertEquals(0, bulkResponse.getBulkFailures().size());
939-
assertEquals(0, bulkResponse.getSearchFailures().size());
940-
assertEquals(
941-
3,
942-
(int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
943-
.getSourceAsMap().get("foo"))
944-
);
945-
}
946-
{
947-
// test update-by-query rethrottling
948-
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
949-
updateByQueryRequest.indices(sourceIndex);
950-
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
951-
updateByQueryRequest.setRefresh(true);
952-
953-
// this following settings are supposed to halt reindexing after first document
954-
updateByQueryRequest.setBatchSize(1);
955-
updateByQueryRequest.setRequestsPerSecond(0.00001f);
956-
final CountDownLatch taskFinished = new CountDownLatch(1);
957-
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
958-
959-
@Override
960-
public void onResponse(BulkByScrollResponse response) {
961-
taskFinished.countDown();
962-
}
963-
964-
@Override
965-
public void onFailure(Exception e) {
966-
fail(e.toString());
967-
}
968-
});
969-
970-
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
971-
float requestsPerSecond = 1000f;
972-
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
973-
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
974-
assertThat(response.getTasks(), hasSize(1));
975-
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
976-
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
977-
assertEquals(Float.toString(requestsPerSecond),
978-
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
979-
taskFinished.await(2, TimeUnit.SECONDS);
980-
981-
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
982-
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
983-
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
984-
assertTrue(response.getTasks().isEmpty());
985-
assertFalse(response.getNodeFailures().isEmpty());
986-
assertEquals(1, response.getNodeFailures().size());
987-
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
988-
response.getNodeFailures().get(0).getCause().getMessage());
989-
}
990-
}
991-
992-
public void testDeleteByQuery() throws Exception {
993-
final String sourceIndex = "source1";
994-
{
995-
// Prepare
996-
Settings settings = Settings.builder()
997-
.put("number_of_shards", 1)
998-
.put("number_of_replicas", 0)
999-
.build();
1000-
createIndex(sourceIndex, settings);
1001-
assertEquals(
1002-
RestStatus.OK,
1003-
highLevelClient().bulk(
1004-
new BulkRequest()
1005-
.add(new IndexRequest(sourceIndex).id("1")
1006-
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
1007-
.add(new IndexRequest(sourceIndex).id("2")
1008-
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
1009-
.add(new IndexRequest(sourceIndex).id("3")
1010-
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
1011-
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
1012-
RequestOptions.DEFAULT
1013-
).status()
1014-
);
1015-
}
1016-
{
1017-
// test1: delete one doc
1018-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
1019-
deleteByQueryRequest.indices(sourceIndex);
1020-
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
1021-
deleteByQueryRequest.setRefresh(true);
1022-
BulkByScrollResponse bulkResponse =
1023-
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
1024-
assertEquals(1, bulkResponse.getTotal());
1025-
assertEquals(1, bulkResponse.getDeleted());
1026-
assertEquals(0, bulkResponse.getNoops());
1027-
assertEquals(0, bulkResponse.getVersionConflicts());
1028-
assertEquals(1, bulkResponse.getBatches());
1029-
assertTrue(bulkResponse.getTook().getMillis() > 0);
1030-
assertEquals(1, bulkResponse.getBatches());
1031-
assertEquals(0, bulkResponse.getBulkFailures().size());
1032-
assertEquals(0, bulkResponse.getSearchFailures().size());
1033-
assertEquals(
1034-
2,
1035-
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value
1036-
);
1037-
}
1038-
{
1039-
// test delete-by-query rethrottling
1040-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
1041-
deleteByQueryRequest.indices(sourceIndex);
1042-
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3"));
1043-
deleteByQueryRequest.setRefresh(true);
1044-
1045-
// this following settings are supposed to halt reindexing after first document
1046-
deleteByQueryRequest.setBatchSize(1);
1047-
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
1048-
final CountDownLatch taskFinished = new CountDownLatch(1);
1049-
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
1050-
1051-
@Override
1052-
public void onResponse(BulkByScrollResponse response) {
1053-
taskFinished.countDown();
1054-
}
1055-
1056-
@Override
1057-
public void onFailure(Exception e) {
1058-
fail(e.toString());
1059-
}
1060-
});
1061-
1062-
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
1063-
float requestsPerSecond = 1000f;
1064-
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
1065-
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
1066-
assertThat(response.getTasks(), hasSize(1));
1067-
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
1068-
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
1069-
assertEquals(Float.toString(requestsPerSecond),
1070-
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
1071-
taskFinished.await(2, TimeUnit.SECONDS);
1072-
1073-
// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
1074-
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
1075-
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
1076-
assertTrue(response.getTasks().isEmpty());
1077-
assertFalse(response.getNodeFailures().isEmpty());
1078-
assertEquals(1, response.getNodeFailures().size());
1079-
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
1080-
response.getNodeFailures().get(0).getCause().getMessage());
1081-
}
1082-
}
1083-
1084841
public void testBulkProcessorIntegration() throws IOException {
1085842
int nbItems = randomIntBetween(10, 100);
1086843
boolean[] errors = new boolean[nbItems];

0 commit comments

Comments
 (0)