Skip to content

Teach reindex to stop when cancelled #16613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.tasks;

import org.elasticsearch.common.Nullable;

import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -56,4 +58,11 @@ public boolean isCancelled() {
return reason.get() != null;
}

/**
* The reason the task was cancelled or null if it hasn't been cancelled.
*/
@Nullable
public String getReasonCancelled() {
return reason.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ public BulkByScrollTask getTask() {
return task;
}

private void initialSearch() {
void initialSearch() {
if (task.isCancelled()) {
finishHim(null);
return;
}
try {
// Default to sorting by _doc if it hasn't been changed.
if (firstSearchRequest.source().sorts() == null) {
Expand Down Expand Up @@ -144,8 +148,19 @@ public void onFailure(Throwable e) {
}
}

/**
* Set the last returned scrollId. Package private for testing.
*/
void setScroll(String scroll) {
this.scroll.set(scroll);
}

void onScrollResponse(SearchResponse searchResponse) {
scroll.set(searchResponse.getScrollId());
if (task.isCancelled()) {
finishHim(null);
return;
}
setScroll(searchResponse.getScrollId());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())));
return;
Expand Down Expand Up @@ -178,7 +193,7 @@ protected void doRun() throws Exception {
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScrollRequest();
startNextScroll();
return;
}
request.timeout(mainRequest.getTimeout());
Expand All @@ -198,6 +213,10 @@ public void onFailure(Throwable t) {
}

void sendBulkRequest(BulkRequest request) {
if (task.isCancelled()) {
finishHim(null);
return;
}
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
Expand All @@ -212,6 +231,10 @@ public void onFailure(Throwable e) {
}

void onBulkResponse(BulkResponse response) {
if (task.isCancelled()) {
finishHim(null);
return;
}
try {
List<Failure> failures = new ArrayList<Failure>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -252,13 +275,17 @@ void onBulkResponse(BulkResponse response) {
startNormalTermination(emptyList(), emptyList());
return;
}
startNextScrollRequest();
startNextScroll();
} catch (Throwable t) {
finishHim(t);
}
}

void startNextScrollRequest() {
void startNextScroll() {
if (task.isCancelled()) {
finishHim(null);
return;
}
SearchScrollRequest request = new SearchScrollRequest();
request.scrollId(scroll.get()).scroll(firstSearchRequest.scroll());
client.searchScroll(request, new ActionListener<SearchResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.plugin.reindex;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;

import java.io.IOException;
Expand All @@ -31,7 +33,7 @@
/**
* Task storing information about a currently running BulkByScroll request.
*/
public class BulkByScrollTask extends Task {
public class BulkByScrollTask extends CancellableTask {
/**
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
Expand All @@ -52,7 +54,7 @@ public BulkByScrollTask(long id, String type, String action, String description)
@Override
public Status getStatus() {
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
retries.get());
retries.get(), getReasonCancelled());
}

/**
Expand All @@ -63,7 +65,7 @@ public long getSuccessfullyProcessed() {
}

public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0);
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, null);

private final long total;
private final long updated;
Expand All @@ -73,8 +75,10 @@ public static class Status implements Task.Status {
private final long versionConflicts;
private final long noops;
private final long retries;
private final String reasonCancelled;

public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries) {
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
@Nullable String reasonCancelled) {
this.total = checkPositive(total, "total");
this.updated = checkPositive(updated, "updated");
this.created = checkPositive(created, "created");
Expand All @@ -83,6 +87,7 @@ public Status(long total, long updated, long created, long deleted, int batches,
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
this.noops = checkPositive(noops, "noops");
this.retries = checkPositive(retries, "retries");
this.reasonCancelled = reasonCancelled;
}

public Status(StreamInput in) throws IOException {
Expand All @@ -94,6 +99,7 @@ public Status(StreamInput in) throws IOException {
versionConflicts = in.readVLong();
noops = in.readVLong();
retries = in.readVLong();
reasonCancelled = in.readOptionalString();
}

@Override
Expand All @@ -106,6 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(versionConflicts);
out.writeVLong(noops);
out.writeVLong(retries);
out.writeOptionalString(reasonCancelled);
}

@Override
Expand All @@ -129,6 +136,9 @@ public XContentBuilder innerXContent(XContentBuilder builder, Params params, boo
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
builder.field("retries", retries);
if (reasonCancelled != null) {
builder.field("canceled", reasonCancelled);
}
return builder;
}

Expand All @@ -152,6 +162,9 @@ public void innerToString(StringBuilder builder, boolean includeCreated, boolean
builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops);
builder.append(",retries=").append(retries);
if (reasonCancelled != null) {
builder.append(",canceled=").append(reasonCancelled);
}
}

@Override
Expand Down Expand Up @@ -221,6 +234,13 @@ public long getRetries() {
return retries;
}

/**
* The reason that the request was canceled or null if it hasn't been.
*/
public String getReasonCancelled() {
return reasonCancelled;
}

private int checkPositive(int value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long getNoops() {
return status.getNoops();
}

/**
* The reason that the request was canceled or null if it hasn't been.
*/
public String getReasonCancelled() {
return status.getReasonCancelled();
}

/**
* All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the
* default).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hamcrest.TypeSafeMatcher;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public abstract class AbstractBulkIndexByScrollResponseMatcher<
Response extends BulkIndexByScrollResponse,
Expand All @@ -36,6 +37,7 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
private Matcher<Integer> batchesMatcher;
private Matcher<Long> versionConflictsMatcher = equalTo(0L);
private Matcher<Integer> failuresMatcher = equalTo(0);
private Matcher<String> reasonCancelledMatcher = nullValue(String.class);

protected abstract Self self();

Expand Down Expand Up @@ -93,13 +95,18 @@ public Self failures(int failures) {
return failures(equalTo(failures));
}

public Self reasonCancelled(Matcher<String> reasonCancelledMatcher) {
this.reasonCancelledMatcher = reasonCancelledMatcher;
return self();
}

@Override
protected boolean matchesSafely(Response item) {
return updatedMatcher.matches(item.getUpdated()) &&
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
failuresMatcher.matches(item.getIndexingFailures().size());
failuresMatcher.matches(item.getIndexingFailures().size()) &&
reasonCancelledMatcher.matches(item.getReasonCancelled());
}

@Override
Expand All @@ -110,5 +117,6 @@ public void describeTo(Description description) {
}
description.appendText(" and versionConflicts matches ").appendDescriptionOf(versionConflictsMatcher);
description.appendText(" and failures size matches ").appendDescriptionOf(failuresMatcher);
description.appendText(" and reason cancelled matches ").appendDescriptionOf(reasonCancelledMatcher);
}
}
Loading