Skip to content

Commit 77145bb

Browse files
author
Christoph Büscher
authored
HLRC: Add support for reindex rethrottling (#33832)
This change adds support for rethrottling reindex requests to the RestHighLevelClient.
1 parent b33c18d commit 77145bb

File tree

9 files changed

+350
-17
lines changed

9 files changed

+350
-17
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,17 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
530530
return request;
531531
}
532532

533+
static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
534+
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
535+
.addPathPart("_rethrottle").build();
536+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
537+
Params params = new Params(request)
538+
.withRequestsPerSecond(rethrottleRequest.getRequestsPerSecond());
539+
// we set "group_by" to "none" because this is the response format we can parse back
540+
params.putParam("group_by", "none");
541+
return request;
542+
}
543+
533544
static Request putScript(PutStoredScriptRequest putStoredScriptRequest) throws IOException {
534545
String endpoint = new EndpointBuilder().addPathPartAsIs("_scripts").addPathPart(putStoredScriptRequest.id()).build();
535546
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@@ -719,11 +730,11 @@ Params withRefreshPolicy(RefreshPolicy refreshPolicy) {
719730

720731
Params withRequestsPerSecond(float requestsPerSecond) {
721732
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
722-
// but we don't want to add that to the URL parameters, instead we leave it out
733+
// but we don't want to add that to the URL parameters, instead we use -1
723734
if (Float.isFinite(requestsPerSecond)) {
724-
return putParam("requests_per_second", Float.toString(requestsPerSecond));
735+
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
725736
} else {
726-
return putParam("requests_per_second", "-1");
737+
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
727738
}
728739
}
729740

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.ActionRequest;
2727
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2829
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
2930
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
3031
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
@@ -474,13 +475,14 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
474475
* Asynchronously executes an update by query request.
475476
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
476477
* Update By Query API on elastic.co</a>
478+
* @param updateByQueryRequest the request
477479
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
478480
* @param listener the listener to be notified upon request completion
479481
*/
480-
public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
482+
public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
481483
ActionListener<BulkByScrollResponse> listener) {
482484
performRequestAsyncAndParseEntity(
483-
reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
485+
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
484486
);
485487
}
486488

@@ -503,16 +505,45 @@ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQue
503505
* Asynchronously executes a delete by query request.
504506
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
505507
* Delete By Query API on elastic.co</a>
508+
* @param deleteByQueryRequest the request
506509
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
507510
* @param listener the listener to be notified upon request completion
508511
*/
509-
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
512+
public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
510513
ActionListener<BulkByScrollResponse> listener) {
511514
performRequestAsyncAndParseEntity(
512-
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
515+
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
513516
);
514517
}
515518

519+
/**
520+
* Executes a reindex rethrottling request.
521+
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
522+
* Reindex rethrottling API on elastic.co</a>
523+
* @param rethrottleRequest the request
524+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
525+
* @return the response
526+
* @throws IOException in case there is a problem sending the request or parsing back the response
527+
*/
528+
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
529+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
530+
emptySet());
531+
}
532+
533+
/**
534+
* Executes a reindex rethrottling request.
535+
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
536+
* Reindex rethrottling API on elastic.co</a>
537+
* @param rethrottleRequest the request
538+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
539+
* @param listener the listener to be notified upon request completion
540+
*/
541+
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
542+
ActionListener<ListTasksResponse> listener) {
543+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
544+
listener, emptySet());
545+
}
546+
516547
/**
517548
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
518549
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import org.elasticsearch.tasks.TaskId;
23+
24+
import java.util.Objects;
25+
26+
/**
27+
* A request changing throttling of a task.
28+
*/
29+
public class RethrottleRequest implements Validatable {
30+
31+
static final String REQUEST_PER_SECOND_PARAMETER = "requests_per_second";
32+
33+
private final TaskId taskId;
34+
private final float requestsPerSecond;
35+
36+
/**
37+
* Create a new {@link RethrottleRequest} which disables any throttling for the given taskId.
38+
* @param taskId the task for which throttling will be disabled
39+
*/
40+
public RethrottleRequest(TaskId taskId) {
41+
this.taskId = taskId;
42+
this.requestsPerSecond = Float.POSITIVE_INFINITY;
43+
}
44+
45+
/**
46+
* Create a new {@link RethrottleRequest} which changes the throttling for the given taskId.
47+
* @param taskId the task that throttling changes will be applied to
48+
* @param requestsPerSecond the number of requests per second that the task should perform. This needs to be a positive value.
49+
*/
50+
public RethrottleRequest(TaskId taskId, float requestsPerSecond) {
51+
Objects.requireNonNull(taskId, "taskId cannot be null");
52+
if (requestsPerSecond <= 0) {
53+
throw new IllegalArgumentException("requestsPerSecond needs to be positive value but was [" + requestsPerSecond+"]");
54+
}
55+
this.taskId = taskId;
56+
this.requestsPerSecond = requestsPerSecond;
57+
}
58+
59+
/**
60+
* @return the task Id
61+
*/
62+
public TaskId getTaskId() {
63+
return taskId;
64+
}
65+
66+
/**
67+
* @return the requests per seconds value
68+
*/
69+
public float getRequestsPerSecond() {
70+
return requestsPerSecond;
71+
}
72+
73+
@Override
74+
public String toString() {
75+
return "RethrottleRequest: taskID = " + taskId +"; reqestsPerSecond = " + requestsPerSecond;
76+
}
77+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchStatusException;
24+
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.DocWriteRequest;
2526
import org.elasticsearch.action.DocWriteResponse;
27+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
28+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
29+
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
2630
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
2731
import org.elasticsearch.action.bulk.BulkItemResponse;
2832
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -52,22 +56,31 @@
5256
import org.elasticsearch.index.query.IdsQueryBuilder;
5357
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5458
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
59+
import org.elasticsearch.index.reindex.ReindexAction;
5560
import org.elasticsearch.index.reindex.ReindexRequest;
5661
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5762
import org.elasticsearch.rest.RestStatus;
5863
import org.elasticsearch.script.Script;
5964
import org.elasticsearch.script.ScriptType;
6065
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
66+
import org.elasticsearch.tasks.RawTaskStatus;
67+
import org.elasticsearch.tasks.TaskId;
6168
import org.joda.time.DateTime;
6269
import org.joda.time.DateTimeZone;
6370
import org.joda.time.format.DateTimeFormat;
6471

6572
import java.io.IOException;
6673
import java.util.Collections;
6774
import java.util.Map;
75+
import java.util.concurrent.CountDownLatch;
76+
import java.util.concurrent.TimeUnit;
6877
import java.util.concurrent.atomic.AtomicReference;
6978

7079
import static java.util.Collections.singletonMap;
80+
import static org.hamcrest.Matchers.empty;
81+
import static org.hamcrest.Matchers.hasSize;
82+
import static org.hamcrest.Matchers.instanceOf;
83+
import static org.hamcrest.Matchers.lessThan;
7184

7285
public class CrudIT extends ESRestHighLevelClientTestCase {
7386

@@ -631,7 +644,7 @@ public void testBulk() throws IOException {
631644
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
632645
}
633646

634-
public void testReindex() throws IOException {
647+
public void testReindex() throws Exception {
635648
final String sourceIndex = "source1";
636649
final String destinationIndex = "dest";
637650
{
@@ -642,15 +655,14 @@ public void testReindex() throws IOException {
642655
.build();
643656
createIndex(sourceIndex, settings);
644657
createIndex(destinationIndex, settings);
658+
BulkRequest bulkRequest = new BulkRequest()
659+
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
660+
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
661+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
645662
assertEquals(
646663
RestStatus.OK,
647664
highLevelClient().bulk(
648-
new BulkRequest()
649-
.add(new IndexRequest(sourceIndex, "type", "1")
650-
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
651-
.add(new IndexRequest(sourceIndex, "type", "2")
652-
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
653-
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
665+
bulkRequest,
654666
RequestOptions.DEFAULT
655667
).status()
656668
);
@@ -692,6 +704,72 @@ public void testReindex() throws IOException {
692704
assertEquals(0, bulkResponse.getBulkFailures().size());
693705
assertEquals(0, bulkResponse.getSearchFailures().size());
694706
}
707+
{
708+
// test reindex rethrottling
709+
ReindexRequest reindexRequest = new ReindexRequest();
710+
reindexRequest.setSourceIndices(sourceIndex);
711+
reindexRequest.setDestIndex(destinationIndex);
712+
713+
// this following settings are supposed to halt reindexing after first document
714+
reindexRequest.setSourceBatchSize(1);
715+
reindexRequest.setRequestsPerSecond(0.00001f);
716+
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
717+
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
718+
719+
@Override
720+
public void onResponse(BulkByScrollResponse response) {
721+
reindexTaskFinished.countDown();
722+
}
723+
724+
@Override
725+
public void onFailure(Exception e) {
726+
fail(e.toString());
727+
}
728+
});
729+
730+
TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
731+
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
732+
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
733+
734+
float requestsPerSecond = 1000f;
735+
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
736+
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
737+
assertThat(response.getTasks(), hasSize(1));
738+
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
739+
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
740+
assertEquals(Float.toString(requestsPerSecond),
741+
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
742+
reindexTaskFinished.await(2, TimeUnit.SECONDS);
743+
744+
// any rethrottling after the reindex is done performed with the same taskId should result in a failure
745+
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
746+
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
747+
assertTrue(response.getTasks().isEmpty());
748+
assertFalse(response.getNodeFailures().isEmpty());
749+
assertEquals(1, response.getNodeFailures().size());
750+
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
751+
response.getNodeFailures().get(0).getCause().getMessage());
752+
}
753+
}
754+
755+
private TaskGroup findTaskToRethrottle() throws IOException {
756+
long start = System.nanoTime();
757+
ListTasksRequest request = new ListTasksRequest();
758+
request.setActions(ReindexAction.NAME);
759+
request.setDetailed(true);
760+
do {
761+
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
762+
list.rethrowFailures("Finding tasks to rethrottle");
763+
assertThat("tasks are left over from the last execution of this test",
764+
list.getTaskGroups(), hasSize(lessThan(2)));
765+
if (0 == list.getTaskGroups().size()) {
766+
// The parent task hasn't started yet
767+
continue;
768+
}
769+
return list.getTaskGroups().get(0);
770+
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
771+
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
772+
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
695773
}
696774

697775
public void testUpdateByQuery() throws IOException {

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
9696
import org.elasticsearch.search.suggest.SuggestBuilder;
9797
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
98+
import org.elasticsearch.tasks.TaskId;
9899
import org.elasticsearch.test.ESTestCase;
99100
import org.elasticsearch.test.RandomObjects;
100101

@@ -319,10 +320,10 @@ public void testReindex() throws IOException {
319320
}
320321
if (randomBoolean()) {
321322
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
322-
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
323+
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
323324
reindexRequest.setRequestsPerSecond(requestsPerSecond);
324325
} else {
325-
expectedParams.put("requests_per_second", "-1");
326+
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
326327
}
327328
if (randomBoolean()) {
328329
reindexRequest.setDestRouting("=cat");
@@ -465,6 +466,34 @@ public void testDeleteByQuery() throws IOException {
465466
assertToXContentBody(deleteByQueryRequest, request.getEntity());
466467
}
467468

469+
public void testRethrottle() throws IOException {
470+
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
471+
RethrottleRequest rethrottleRequest;
472+
Float requestsPerSecond;
473+
Map<String, String> expectedParams = new HashMap<>();
474+
if (frequently()) {
475+
requestsPerSecond = (float) randomDoubleBetween(0.0, 100.0, true);
476+
rethrottleRequest = new RethrottleRequest(taskId, requestsPerSecond);
477+
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
478+
} else {
479+
rethrottleRequest = new RethrottleRequest(taskId);
480+
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
481+
}
482+
expectedParams.put("group_by", "none");
483+
Request request = RequestConverters.rethrottle(rethrottleRequest);
484+
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
485+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
486+
assertEquals(expectedParams, request.getParameters());
487+
assertNull(request.getEntity());
488+
489+
// test illegal RethrottleRequest values
490+
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));
491+
assertEquals("taskId cannot be null", e.getMessage());
492+
493+
e = expectThrows(IllegalArgumentException.class, () -> new RethrottleRequest(new TaskId("taskId", 1), -5.0f));
494+
assertEquals("requestsPerSecond needs to be positive value but was [-5.0]", e.getMessage());
495+
}
496+
468497
public void testIndex() throws IOException {
469498
String index = randomAlphaOfLengthBetween(3, 10);
470499
String type = randomAlphaOfLengthBetween(3, 10);

0 commit comments

Comments
 (0)