Skip to content

Commit b59a08c

Browse files
Christoph Büscherkcm
Christoph Büscher
authored andcommitted
HLRC: Add throttling for update & delete-by-query (#33951)
This change adds throttling to the update-by-query and delete-by-query cases similar to throttling for reindex. This mostly means additional methods on the client class itself, since the request hits the same RestHandler, just with slightly different endpoints, and also the return values are similar.
1 parent 42fb124 commit b59a08c

File tree

10 files changed

+288
-47
lines changed

10 files changed

+288
-47
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,20 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
530530
return request;
531531
}
532532

533-
static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
534-
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
533+
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
534+
return rethrottle(rethrottleRequest, "_reindex");
535+
}
536+
537+
static Request rethrottleUpdateByQuery(RethrottleRequest rethrottleRequest) {
538+
return rethrottle(rethrottleRequest, "_update_by_query");
539+
}
540+
541+
static Request rethrottleDeleteByQuery(RethrottleRequest rethrottleRequest) {
542+
return rethrottle(rethrottleRequest, "_delete_by_query");
543+
}
544+
545+
private static Request rethrottle(RethrottleRequest rethrottleRequest, String firstPathPart) {
546+
String endpoint = new EndpointBuilder().addPathPart(firstPathPart).addPathPart(rethrottleRequest.getTaskId().toString())
535547
.addPathPart("_rethrottle").build();
536548
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
537549
Params params = new Params(request)

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,62 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
516516
);
517517
}
518518

519+
/**
520+
* Executes a delete by query rethrottle request.
521+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
522+
* Delete By Query API on elastic.co</a>
523+
* @param rethrottleRequest the request
524+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
525+
* @return the response
526+
* @throws IOException in case there is a problem sending the request or parsing back the response
527+
*/
528+
public final ListTasksResponse deleteByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
529+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
530+
ListTasksResponse::fromXContent, emptySet());
531+
}
532+
533+
/**
534+
* Asynchronously execute an delete by query rethrottle request.
535+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
536+
* Delete By Query API on elastic.co</a>
537+
* @param rethrottleRequest the request
538+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
539+
* @param listener the listener to be notified upon request completion
540+
*/
541+
public final void deleteByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
542+
ActionListener<ListTasksResponse> listener) {
543+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
544+
ListTasksResponse::fromXContent, listener, emptySet());
545+
}
546+
547+
/**
548+
* Executes a update by query rethrottle request.
549+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
550+
* Update By Query API on elastic.co</a>
551+
* @param rethrottleRequest the request
552+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
553+
* @return the response
554+
* @throws IOException in case there is a problem sending the request or parsing back the response
555+
*/
556+
public final ListTasksResponse updateByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
557+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
558+
ListTasksResponse::fromXContent, emptySet());
559+
}
560+
561+
/**
562+
* Asynchronously execute an update by query rethrottle request.
563+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
564+
* Update By Query API on elastic.co</a>
565+
* @param rethrottleRequest the request
566+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
567+
* @param listener the listener to be notified upon request completion
568+
*/
569+
public final void updateByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
570+
ActionListener<ListTasksResponse> listener) {
571+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
572+
ListTasksResponse::fromXContent, listener, emptySet());
573+
}
574+
519575
/**
520576
* Executes a reindex rethrottling request.
521577
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
@@ -527,8 +583,8 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
527583
* @throws IOException in case there is a problem sending the request or parsing back the response
528584
*/
529585
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
530-
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
531-
emptySet());
586+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options,
587+
ListTasksResponse::fromXContent, emptySet());
532588
}
533589

534590
/**
@@ -541,9 +597,9 @@ public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleReq
541597
* @param listener the listener to be notified upon request completion
542598
*/
543599
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
544-
ActionListener<ListTasksResponse> listener) {
545-
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
546-
listener, emptySet());
600+
ActionListener<ListTasksResponse> listener) {
601+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options, ListTasksResponse::fromXContent,
602+
listener, emptySet());
547603
}
548604

549605
/**

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
import org.elasticsearch.index.get.GetResult;
5656
import org.elasticsearch.index.query.IdsQueryBuilder;
5757
import org.elasticsearch.index.reindex.BulkByScrollResponse;
58+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
5859
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5960
import org.elasticsearch.index.reindex.ReindexAction;
6061
import org.elasticsearch.index.reindex.ReindexRequest;
62+
import org.elasticsearch.index.reindex.UpdateByQueryAction;
6163
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
6264
import org.elasticsearch.rest.RestStatus;
6365
import org.elasticsearch.script.Script;
@@ -727,10 +729,7 @@ public void onFailure(Exception e) {
727729
}
728730
});
729731

730-
TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
731-
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
732-
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
733-
732+
TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
734733
float requestsPerSecond = 1000f;
735734
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
736735
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
@@ -752,10 +751,10 @@ public void onFailure(Exception e) {
752751
}
753752
}
754753

755-
private TaskGroup findTaskToRethrottle() throws IOException {
754+
private TaskId findTaskToRethrottle(String actionName) throws IOException {
756755
long start = System.nanoTime();
757756
ListTasksRequest request = new ListTasksRequest();
758-
request.setActions(ReindexAction.NAME);
757+
request.setActions(actionName);
759758
request.setDetailed(true);
760759
do {
761760
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
@@ -766,13 +765,15 @@ private TaskGroup findTaskToRethrottle() throws IOException {
766765
// The parent task hasn't started yet
767766
continue;
768767
}
769-
return list.getTaskGroups().get(0);
768+
TaskGroup taskGroup = list.getTaskGroups().get(0);
769+
assertThat(taskGroup.getChildTasks(), empty());
770+
return taskGroup.getTaskInfo().getTaskId();
770771
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
771772
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
772773
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
773774
}
774775

775-
public void testUpdateByQuery() throws IOException {
776+
public void testUpdateByQuery() throws Exception {
776777
final String sourceIndex = "source1";
777778
{
778779
// Prepare
@@ -836,9 +837,53 @@ public void testUpdateByQuery() throws IOException {
836837
.getSourceAsMap().get("foo"))
837838
);
838839
}
840+
{
841+
// test update-by-query rethrottling
842+
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
843+
updateByQueryRequest.indices(sourceIndex);
844+
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
845+
updateByQueryRequest.setRefresh(true);
846+
847+
// this following settings are supposed to halt reindexing after first document
848+
updateByQueryRequest.setBatchSize(1);
849+
updateByQueryRequest.setRequestsPerSecond(0.00001f);
850+
final CountDownLatch taskFinished = new CountDownLatch(1);
851+
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
852+
853+
@Override
854+
public void onResponse(BulkByScrollResponse response) {
855+
taskFinished.countDown();
856+
}
857+
858+
@Override
859+
public void onFailure(Exception e) {
860+
fail(e.toString());
861+
}
862+
});
863+
864+
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
865+
float requestsPerSecond = 1000f;
866+
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
867+
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
868+
assertThat(response.getTasks(), hasSize(1));
869+
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
870+
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
871+
assertEquals(Float.toString(requestsPerSecond),
872+
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
873+
taskFinished.await(2, TimeUnit.SECONDS);
874+
875+
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
876+
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
877+
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
878+
assertTrue(response.getTasks().isEmpty());
879+
assertFalse(response.getNodeFailures().isEmpty());
880+
assertEquals(1, response.getNodeFailures().size());
881+
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
882+
response.getNodeFailures().get(0).getCause().getMessage());
883+
}
839884
}
840885

841-
public void testDeleteByQuery() throws IOException {
886+
public void testDeleteByQuery() throws Exception {
842887
final String sourceIndex = "source1";
843888
{
844889
// Prepare
@@ -855,6 +900,8 @@ public void testDeleteByQuery() throws IOException {
855900
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
856901
.add(new IndexRequest(sourceIndex, "type", "2")
857902
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
903+
.add(new IndexRequest(sourceIndex, "type", "3")
904+
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
858905
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
859906
RequestOptions.DEFAULT
860907
).status()
@@ -878,10 +925,54 @@ public void testDeleteByQuery() throws IOException {
878925
assertEquals(0, bulkResponse.getBulkFailures().size());
879926
assertEquals(0, bulkResponse.getSearchFailures().size());
880927
assertEquals(
881-
1,
928+
2,
882929
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
883930
);
884931
}
932+
{
933+
// test delete-by-query rethrottling
934+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
935+
deleteByQueryRequest.indices(sourceIndex);
936+
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3").types("type"));
937+
deleteByQueryRequest.setRefresh(true);
938+
939+
// this following settings are supposed to halt reindexing after first document
940+
deleteByQueryRequest.setBatchSize(1);
941+
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
942+
final CountDownLatch taskFinished = new CountDownLatch(1);
943+
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
944+
945+
@Override
946+
public void onResponse(BulkByScrollResponse response) {
947+
taskFinished.countDown();
948+
}
949+
950+
@Override
951+
public void onFailure(Exception e) {
952+
fail(e.toString());
953+
}
954+
});
955+
956+
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
957+
float requestsPerSecond = 1000f;
958+
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
959+
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
960+
assertThat(response.getTasks(), hasSize(1));
961+
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
962+
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
963+
assertEquals(Float.toString(requestsPerSecond),
964+
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
965+
taskFinished.await(2, TimeUnit.SECONDS);
966+
967+
// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
968+
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
969+
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
970+
assertTrue(response.getTasks().isEmpty());
971+
assertFalse(response.getNodeFailures().isEmpty());
972+
assertEquals(1, response.getNodeFailures().size());
973+
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
974+
response.getNodeFailures().get(0).getCause().getMessage());
975+
}
885976
}
886977

887978
public void testBulkProcessorIntegration() throws IOException {

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.common.Strings;
6060
import org.elasticsearch.common.bytes.BytesArray;
6161
import org.elasticsearch.common.bytes.BytesReference;
62+
import org.elasticsearch.common.collect.Tuple;
6263
import org.elasticsearch.common.io.Streams;
6364
import org.elasticsearch.common.lucene.uid.Versions;
6465
import org.elasticsearch.common.unit.TimeValue;
@@ -466,7 +467,7 @@ public void testDeleteByQuery() throws IOException {
466467
assertToXContentBody(deleteByQueryRequest, request.getEntity());
467468
}
468469

469-
public void testRethrottle() throws IOException {
470+
public void testRethrottle() {
470471
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
471472
RethrottleRequest rethrottleRequest;
472473
Float requestsPerSecond;
@@ -480,11 +481,20 @@ public void testRethrottle() throws IOException {
480481
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
481482
}
482483
expectedParams.put("group_by", "none");
483-
Request request = RequestConverters.rethrottle(rethrottleRequest);
484-
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
485-
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
486-
assertEquals(expectedParams, request.getParameters());
487-
assertNull(request.getEntity());
484+
List<Tuple<String, Supplier<Request>>> variants = new ArrayList<>();
485+
variants.add(new Tuple<String, Supplier<Request>>("_reindex", () -> RequestConverters.rethrottleReindex(rethrottleRequest)));
486+
variants.add(new Tuple<String, Supplier<Request>>("_update_by_query",
487+
() -> RequestConverters.rethrottleUpdateByQuery(rethrottleRequest)));
488+
variants.add(new Tuple<String, Supplier<Request>>("_delete_by_query",
489+
() -> RequestConverters.rethrottleDeleteByQuery(rethrottleRequest)));
490+
491+
for (Tuple<String, Supplier<Request>> variant : variants) {
492+
Request request = variant.v2().get();
493+
assertEquals("/" + variant.v1() + "/" + taskId + "/_rethrottle", request.getEndpoint());
494+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
495+
assertEquals(expectedParams, request.getParameters());
496+
assertNull(request.getEntity());
497+
}
488498

489499
// test illegal RethrottleRequest values
490500
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));

0 commit comments

Comments
 (0)