Skip to content

HLRC: Add throttling for update & delete-by-query #33951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,20 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
return request;
}

static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_reindex");
}

static Request rethrottleUpdateByQuery(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_update_by_query");
}

static Request rethrottleDeleteByQuery(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_delete_by_query");
}

private static Request rethrottle(RethrottleRequest rethrottleRequest, String firstPathPart) {
String endpoint = new EndpointBuilder().addPathPart(firstPathPart).addPathPart(rethrottleRequest.getTaskId().toString())
.addPathPart("_rethrottle").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,62 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
);
}

/**
* Executes a delete by query rethrottle request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final ListTasksResponse deleteByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
ListTasksResponse::fromXContent, emptySet());
}

/**
* Asynchronously execute an delete by query rethrottle request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void deleteByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
ActionListener<ListTasksResponse> listener) {
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
ListTasksResponse::fromXContent, listener, emptySet());
}

/**
* Executes a update by query rethrottle request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
* Update By Query API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final ListTasksResponse updateByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
ListTasksResponse::fromXContent, emptySet());
}

/**
* Asynchronously execute an update by query rethrottle request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
* Update By Query API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void updateByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
ActionListener<ListTasksResponse> listener) {
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
ListTasksResponse::fromXContent, listener, emptySet());
}

/**
* Executes a reindex rethrottling request.
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
Expand All @@ -527,8 +583,8 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
emptySet());
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options,
ListTasksResponse::fromXContent, emptySet());
}

/**
Expand All @@ -541,9 +597,9 @@ public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleReq
* @param listener the listener to be notified upon request completion
*/
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
ActionListener<ListTasksResponse> listener) {
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
listener, emptySet());
ActionListener<ListTasksResponse> listener) {
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options, ListTasksResponse::fromXContent,
listener, emptySet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -727,10 +729,7 @@ public void onFailure(Exception e) {
}
});

TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();

TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
Expand All @@ -752,10 +751,10 @@ public void onFailure(Exception e) {
}
}

private TaskGroup findTaskToRethrottle() throws IOException {
private TaskId findTaskToRethrottle(String actionName) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(ReindexAction.NAME);
request.setActions(actionName);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
Expand All @@ -766,13 +765,15 @@ private TaskGroup findTaskToRethrottle() throws IOException {
// The parent task hasn't started yet
continue;
}
return list.getTaskGroups().get(0);
TaskGroup taskGroup = list.getTaskGroups().get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}

public void testUpdateByQuery() throws IOException {
public void testUpdateByQuery() throws Exception {
final String sourceIndex = "source1";
{
// Prepare
Expand Down Expand Up @@ -836,9 +837,53 @@ public void testUpdateByQuery() throws IOException {
.getSourceAsMap().get("foo"))
);
}
{
// test update-by-query rethrottling
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
updateByQueryRequest.setRefresh(true);

// this following settings are supposed to halt reindexing after first document
updateByQueryRequest.setBatchSize(1);
updateByQueryRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch taskFinished = new CountDownLatch(1);
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse(BulkByScrollResponse response) {
taskFinished.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
taskFinished.await(2, TimeUnit.SECONDS);

// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}

public void testDeleteByQuery() throws IOException {
public void testDeleteByQuery() throws Exception {
final String sourceIndex = "source1";
{
// Prepare
Expand All @@ -855,6 +900,8 @@ public void testDeleteByQuery() throws IOException {
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "3")
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
Expand All @@ -878,10 +925,54 @@ public void testDeleteByQuery() throws IOException {
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
assertEquals(
1,
2,
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
);
}
{
// test delete-by-query rethrottling
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.indices(sourceIndex);
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3").types("type"));
deleteByQueryRequest.setRefresh(true);

// this following settings are supposed to halt reindexing after first document
deleteByQueryRequest.setBatchSize(1);
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch taskFinished = new CountDownLatch(1);
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse(BulkByScrollResponse response) {
taskFinished.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
taskFinished.await(2, TimeUnit.SECONDS);

// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}

public void testBulkProcessorIntegration() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -466,7 +467,7 @@ public void testDeleteByQuery() throws IOException {
assertToXContentBody(deleteByQueryRequest, request.getEntity());
}

public void testRethrottle() throws IOException {
public void testRethrottle() {
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
RethrottleRequest rethrottleRequest;
Float requestsPerSecond;
Expand All @@ -480,11 +481,20 @@ public void testRethrottle() throws IOException {
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
expectedParams.put("group_by", "none");
Request request = RequestConverters.rethrottle(rethrottleRequest);
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedParams, request.getParameters());
assertNull(request.getEntity());
List<Tuple<String, Supplier<Request>>> variants = new ArrayList<>();
variants.add(new Tuple<String, Supplier<Request>>("_reindex", () -> RequestConverters.rethrottleReindex(rethrottleRequest)));
variants.add(new Tuple<String, Supplier<Request>>("_update_by_query",
() -> RequestConverters.rethrottleUpdateByQuery(rethrottleRequest)));
variants.add(new Tuple<String, Supplier<Request>>("_delete_by_query",
() -> RequestConverters.rethrottleDeleteByQuery(rethrottleRequest)));

for (Tuple<String, Supplier<Request>> variant : variants) {
Request request = variant.v2().get();
assertEquals("/" + variant.v1() + "/" + taskId + "/_rethrottle", request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedParams, request.getParameters());
assertNull(request.getEntity());
}

// test illegal RethrottleRequest values
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));
Expand Down
Loading