diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index bd7824470b5e8..901f564075875 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -530,6 +530,17 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
return request;
}
+ static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
+ String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
+ .addPathPart("_rethrottle").build();
+ Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+ Params params = new Params(request)
+ .withRequestsPerSecond(rethrottleRequest.getRequestsPerSecond());
+ // we set "group_by" to "none" because this is the response format we can parse back
+ params.putParam("group_by", "none");
+ return request;
+ }
+
static Request putScript(PutStoredScriptRequest putStoredScriptRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPartAsIs("_scripts").addPathPart(putStoredScriptRequest.id()).build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@@ -719,11 +730,11 @@ Params withRefreshPolicy(RefreshPolicy refreshPolicy) {
Params withRequestsPerSecond(float requestsPerSecond) {
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
- // but we don't want to add that to the URL parameters, instead we leave it out
+ // but we don't want to add that to the URL parameters, instead we use -1
if (Float.isFinite(requestsPerSecond)) {
- return putParam("requests_per_second", Float.toString(requestsPerSecond));
+ return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
} else {
- return putParam("requests_per_second", "-1");
+ return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
index ae1766fab0271..176436ce458a7 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
@@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
@@ -474,13 +475,14 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
* Asynchronously executes an update by query request.
* See
* Update By Query API on elastic.co
+ * @param updateByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
- public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
+ public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
ActionListener listener) {
performRequestAsyncAndParseEntity(
- reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
+ updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}
@@ -503,16 +505,45 @@ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQue
* Asynchronously executes a delete by query request.
* See
* Delete By Query API on elastic.co
+ * @param deleteByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
- public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
+ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
ActionListener listener) {
performRequestAsyncAndParseEntity(
- reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
+ deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}
+ /**
+ * Executes a reindex rethrottling request.
+ * See the
+ * Reindex rethrottling API on elastic.co
+ * @param rethrottleRequest the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ * @throws IOException in case there is a problem sending the request or parsing back the response
+ */
+ public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
+ return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
+ emptySet());
+ }
+
+ /**
+ * Executes a reindex rethrottling request.
+ * See the
+ * Reindex rethrottling API on elastic.co
+ * @param rethrottleRequest the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @param listener the listener to be notified upon request completion
+ */
+ public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
+ ActionListener listener) {
+ performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
+ listener, emptySet());
+ }
+
/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RethrottleRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RethrottleRequest.java
new file mode 100644
index 0000000000000..eb1c666a0cf1e
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RethrottleRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.tasks.TaskId;
+
+import java.util.Objects;
+
+/**
+ * A request changing throttling of a task.
+ */
+public class RethrottleRequest implements Validatable {
+
+ static final String REQUEST_PER_SECOND_PARAMETER = "requests_per_second";
+
+ private final TaskId taskId;
+ private final float requestsPerSecond;
+
+ /**
+ * Create a new {@link RethrottleRequest} which disables any throttling for the given taskId.
+ * @param taskId the task for which throttling will be disabled
+ */
+ public RethrottleRequest(TaskId taskId) {
+ this.taskId = taskId;
+ this.requestsPerSecond = Float.POSITIVE_INFINITY;
+ }
+
+ /**
+ * Create a new {@link RethrottleRequest} which changes the throttling for the given taskId.
+ * @param taskId the task that throttling changes will be applied to
+ * @param requestsPerSecond the number of requests per second that the task should perform. This needs to be a positive value.
+ */
+ public RethrottleRequest(TaskId taskId, float requestsPerSecond) {
+ Objects.requireNonNull(taskId, "taskId cannot be null");
+ if (requestsPerSecond <= 0) {
+ throw new IllegalArgumentException("requestsPerSecond needs to be positive value but was [" + requestsPerSecond+"]");
+ }
+ this.taskId = taskId;
+ this.requestsPerSecond = requestsPerSecond;
+ }
+
+ /**
+ * @return the task Id
+ */
+ public TaskId getTaskId() {
+ return taskId;
+ }
+
+ /**
+ * @return the requests per seconds value
+ */
+ public float getRequestsPerSecond() {
+ return requestsPerSecond;
+ }
+
+ @Override
+ public String toString() {
+ return "RethrottleRequest: taskID = " + taskId +"; reqestsPerSecond = " + requestsPerSecond;
+ }
+}
\ No newline at end of file
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
index feb57bed9c46b..293e6b24a7b0d 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
@@ -21,8 +21,12 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -52,12 +56,15 @@
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.tasks.RawTaskStatus;
+import org.elasticsearch.tasks.TaskId;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -65,9 +72,15 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonMap;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThan;
public class CrudIT extends ESRestHighLevelClientTestCase {
@@ -631,7 +644,7 @@ public void testBulk() throws IOException {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}
- public void testReindex() throws IOException {
+ public void testReindex() throws Exception {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
@@ -642,15 +655,14 @@ public void testReindex() throws IOException {
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
+ BulkRequest bulkRequest = new BulkRequest()
+ .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
+ .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
+ .setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
- new BulkRequest()
- .add(new IndexRequest(sourceIndex, "type", "1")
- .source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
- .add(new IndexRequest(sourceIndex, "type", "2")
- .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
- .setRefreshPolicy(RefreshPolicy.IMMEDIATE),
+ bulkRequest,
RequestOptions.DEFAULT
).status()
);
@@ -692,6 +704,72 @@ public void testReindex() throws IOException {
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
+ {
+ // test reindex rethrottling
+ ReindexRequest reindexRequest = new ReindexRequest();
+ reindexRequest.setSourceIndices(sourceIndex);
+ reindexRequest.setDestIndex(destinationIndex);
+
+ // this following settings are supposed to halt reindexing after first document
+ reindexRequest.setSourceBatchSize(1);
+ reindexRequest.setRequestsPerSecond(0.00001f);
+ final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
+ highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener() {
+
+ @Override
+ public void onResponse(BulkByScrollResponse response) {
+ reindexTaskFinished.countDown();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ fail(e.toString());
+ }
+ });
+
+ TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
+ assertThat(taskGroupToRethrottle.getChildTasks(), empty());
+ TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
+
+ float requestsPerSecond = 1000f;
+ ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
+ highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
+ assertThat(response.getTasks(), hasSize(1));
+ assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
+ assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
+ assertEquals(Float.toString(requestsPerSecond),
+ ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
+ reindexTaskFinished.await(2, TimeUnit.SECONDS);
+
+ // any rethrottling after the reindex is done performed with the same taskId should result in a failure
+ response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
+ highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
+ assertTrue(response.getTasks().isEmpty());
+ assertFalse(response.getNodeFailures().isEmpty());
+ assertEquals(1, response.getNodeFailures().size());
+ assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
+ response.getNodeFailures().get(0).getCause().getMessage());
+ }
+ }
+
+ private TaskGroup findTaskToRethrottle() throws IOException {
+ long start = System.nanoTime();
+ ListTasksRequest request = new ListTasksRequest();
+ request.setActions(ReindexAction.NAME);
+ request.setDetailed(true);
+ do {
+ ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
+ list.rethrowFailures("Finding tasks to rethrottle");
+ assertThat("tasks are left over from the last execution of this test",
+ list.getTaskGroups(), hasSize(lessThan(2)));
+ if (0 == list.getTaskGroups().size()) {
+ // The parent task hasn't started yet
+ continue;
+ }
+ return list.getTaskGroups().get(0);
+ } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
+ throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
+ highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}
public void testUpdateByQuery() throws IOException {
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index 1d06981dedff9..5fce2f168fbc9 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -95,6 +95,7 @@
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
+import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects;
@@ -319,10 +320,10 @@ public void testReindex() throws IOException {
}
if (randomBoolean()) {
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
- expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
+ expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
reindexRequest.setRequestsPerSecond(requestsPerSecond);
} else {
- expectedParams.put("requests_per_second", "-1");
+ expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
if (randomBoolean()) {
reindexRequest.setDestRouting("=cat");
@@ -465,6 +466,34 @@ public void testDeleteByQuery() throws IOException {
assertToXContentBody(deleteByQueryRequest, request.getEntity());
}
+ public void testRethrottle() throws IOException {
+ TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
+ RethrottleRequest rethrottleRequest;
+ Float requestsPerSecond;
+ Map expectedParams = new HashMap<>();
+ if (frequently()) {
+ requestsPerSecond = (float) randomDoubleBetween(0.0, 100.0, true);
+ rethrottleRequest = new RethrottleRequest(taskId, requestsPerSecond);
+ expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
+ } else {
+ rethrottleRequest = new RethrottleRequest(taskId);
+ expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
+ }
+ expectedParams.put("group_by", "none");
+ Request request = RequestConverters.rethrottle(rethrottleRequest);
+ assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
+ assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+ assertEquals(expectedParams, request.getParameters());
+ assertNull(request.getEntity());
+
+ // test illegal RethrottleRequest values
+ Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));
+ assertEquals("taskId cannot be null", e.getMessage());
+
+ e = expectThrows(IllegalArgumentException.class, () -> new RethrottleRequest(new TaskId("taskId", 1), -5.0f));
+ assertEquals("requestsPerSecond needs to be positive value but was [-5.0]", e.getMessage());
+ }
+
public void testIndex() throws IOException {
String index = randomAlphaOfLengthBetween(3, 10);
String type = randomAlphaOfLengthBetween(3, 10);
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
index ca6043768df65..acdfc50b5a13a 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
@@ -20,6 +20,7 @@
package org.elasticsearch.client;
import com.fasterxml.jackson.core.JsonParseException;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
@@ -658,7 +659,6 @@ public void testApiNamingConventions() throws Exception {
"indices.get_upgrade",
"indices.put_alias",
"mtermvectors",
- "reindex_rethrottle",
"render_search_template",
"scripts_painless_execute",
"tasks.get",
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
index 142eacd820ffd..85612147b464d 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
@@ -24,6 +24,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.LatchedActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -50,6 +51,7 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RethrottleRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
@@ -75,6 +77,7 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
import java.util.Collections;
import java.util.Date;
@@ -902,6 +905,48 @@ public void onFailure(Exception e) {
}
}
+ public void testReindexRethrottle() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+ TaskId taskId = new TaskId("oTUltX4IQMOUUVeiohTt8A:124");
+ {
+ // tag::rethrottle-disable-request
+ RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId); // <1>
+ client.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
+ // end::rethrottle-disable-request
+ }
+
+ {
+ // tag::rethrottle-request
+ RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId, 100.0f); // <1>
+ client.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
+ // end::rethrottle-request
+ }
+
+ // tag::rethrottle-request-async
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(ListTasksResponse response) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::rethrottle-request-async
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId);
+ // tag::rethrottle-execute-async
+ client.reindexRethrottleAsync(rethrottleRequest, RequestOptions.DEFAULT, listener); // <1>
+ // end::rethrottle-execute-async
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+
public void testUpdateByQuery() throws Exception {
RestHighLevelClient client = highLevelClient();
{
diff --git a/docs/java-rest/high-level/document/reindex-rethrottle.asciidoc b/docs/java-rest/high-level/document/reindex-rethrottle.asciidoc
new file mode 100644
index 0000000000000..77b2fc335862f
--- /dev/null
+++ b/docs/java-rest/high-level/document/reindex-rethrottle.asciidoc
@@ -0,0 +1,60 @@
+[[java-rest-high-document-reindex-rethrottle]]
+=== Reindex Rethrottle API
+
+[[java-rest-high-document-reindex-rethrottle-request]]
+==== Reindex Rethrolle Request
+
+A `RethrottleRequest` can be used to change existing throttling on a runnind
+reindex task or disable it entirely. It requires the task Id of the reindex
+task to change.
+
+In its simplest form, you can use it to disable throttling of a running
+reindex task using the following:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-disable-request]
+--------------------------------------------------
+<1> Create a `RethrottleRequest` that disables throttling for a specific task id
+
+By providing a `requestsPerSecond` argument, the request will change the
+existing task throttling to the specified value:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-request]
+--------------------------------------------------
+<1> Request to change the throttling of a task to 100 requests per second
+
+[[java-rest-high-document-reindex-rethrottle-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a rethrottle request requires both the `RethrottleRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-execute-async]
+--------------------------------------------------
+<1> The RethrottleRequest to execute and the ActionListener to use when the
+execution completes
+
+The asynchronous method does not block and returns immediately.
+Once it is completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed. A typical listener looks like this:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-request-async]
+--------------------------------------------------
+<1> Code executed when the request is successfully completed
+<2> Code executed when the request fails with an exception
+
+[[java-rest-high-document-reindex-retrottle-response]]
+==== Rethrottle Response
+
+Rethrottling returns the task that has been rethrottled in the form of a
+`ListTasksResponse`. The structure of this response object is described in detail
+in <>.
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index 51d00c403de62..f91a2ed8e7507 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -18,6 +18,7 @@ Multi-document APIs::
* <>
* <>
* <>
+* <>
include::document/index.asciidoc[]
include::document/get.asciidoc[]
@@ -29,6 +30,7 @@ include::document/multi-get.asciidoc[]
include::document/reindex.asciidoc[]
include::document/update-by-query.asciidoc[]
include::document/delete-by-query.asciidoc[]
+include::document/reindex-rethrottle.asciidoc[]
== Search APIs