Skip to content

Commit 30107b4

Browse files
committed
Teach reindex to stop when cancelled
All we do is check the cancelled flag and stop the request at a few key points. Adds the cancellation cause to the status so any request that is cancelled but doesn't die can be seen in the task list.
1 parent a9fa0cb commit 30107b4

File tree

13 files changed

+445
-23
lines changed

13 files changed

+445
-23
lines changed

core/src/main/java/org/elasticsearch/tasks/CancellableTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.tasks;
2121

22+
import org.elasticsearch.common.Nullable;
23+
2224
import java.util.concurrent.atomic.AtomicReference;
2325

2426
/**
@@ -56,4 +58,11 @@ public boolean isCancelled() {
5658
return reason.get() != null;
5759
}
5860

61+
/**
62+
* The reason the task was cancelled or null if it hasn't been cancelled.
63+
*/
64+
@Nullable
65+
public String getReasonCancelled() {
66+
return reason.get();
67+
}
5968
}

plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,11 @@ public BulkByScrollTask getTask() {
113113
return task;
114114
}
115115

116-
private void initialSearch() {
116+
void initialSearch() {
117+
if (task.isCancelled()) {
118+
finishHim(null);
119+
return;
120+
}
117121
try {
118122
// Default to sorting by _doc if it hasn't been changed.
119123
if (firstSearchRequest.source().sorts() == null) {
@@ -144,8 +148,19 @@ public void onFailure(Throwable e) {
144148
}
145149
}
146150

151+
/**
152+
* Set the last returned scrollId. Package private for testing.
153+
*/
154+
void setScroll(String scroll) {
155+
this.scroll.set(scroll);
156+
}
157+
147158
void onScrollResponse(SearchResponse searchResponse) {
148-
scroll.set(searchResponse.getScrollId());
159+
if (task.isCancelled()) {
160+
finishHim(null);
161+
return;
162+
}
163+
setScroll(searchResponse.getScrollId());
149164
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
150165
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())));
151166
return;
@@ -178,7 +193,7 @@ protected void doRun() throws Exception {
178193
/*
179194
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
180195
*/
181-
startNextScrollRequest();
196+
startNextScroll();
182197
return;
183198
}
184199
request.timeout(mainRequest.getTimeout());
@@ -198,6 +213,10 @@ public void onFailure(Throwable t) {
198213
}
199214

200215
void sendBulkRequest(BulkRequest request) {
216+
if (task.isCancelled()) {
217+
finishHim(null);
218+
return;
219+
}
201220
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
202221
@Override
203222
public void onResponse(BulkResponse response) {
@@ -212,6 +231,10 @@ public void onFailure(Throwable e) {
212231
}
213232

214233
void onBulkResponse(BulkResponse response) {
234+
if (task.isCancelled()) {
235+
finishHim(null);
236+
return;
237+
}
215238
try {
216239
List<Failure> failures = new ArrayList<Failure>();
217240
Set<String> destinationIndicesThisBatch = new HashSet<>();
@@ -252,13 +275,17 @@ void onBulkResponse(BulkResponse response) {
252275
startNormalTermination(emptyList(), emptyList());
253276
return;
254277
}
255-
startNextScrollRequest();
278+
startNextScroll();
256279
} catch (Throwable t) {
257280
finishHim(t);
258281
}
259282
}
260283

261-
void startNextScrollRequest() {
284+
void startNextScroll() {
285+
if (task.isCancelled()) {
286+
finishHim(null);
287+
return;
288+
}
262289
SearchScrollRequest request = new SearchScrollRequest();
263290
request.scrollId(scroll.get()).scroll(firstSearchRequest.scroll());
264291
client.searchScroll(request, new ActionListener<SearchResponse>() {

plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkByScrollTask.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
package org.elasticsearch.plugin.reindex;
2121

22+
import org.elasticsearch.common.Nullable;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.xcontent.XContentBuilder;
26+
import org.elasticsearch.tasks.CancellableTask;
2527
import org.elasticsearch.tasks.Task;
2628

2729
import java.io.IOException;
@@ -31,7 +33,7 @@
3133
/**
3234
* Task storing information about a currently running BulkByScroll request.
3335
*/
34-
public class BulkByScrollTask extends Task {
36+
public class BulkByScrollTask extends CancellableTask {
3537
/**
3638
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
3739
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
@@ -52,7 +54,7 @@ public BulkByScrollTask(long id, String type, String action, String description)
5254
@Override
5355
public Status getStatus() {
5456
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
55-
retries.get());
57+
retries.get(), getReasonCancelled());
5658
}
5759

5860
/**
@@ -63,7 +65,7 @@ public long getSuccessfullyProcessed() {
6365
}
6466

6567
public static class Status implements Task.Status {
66-
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0);
68+
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, null);
6769

6870
private final long total;
6971
private final long updated;
@@ -73,8 +75,10 @@ public static class Status implements Task.Status {
7375
private final long versionConflicts;
7476
private final long noops;
7577
private final long retries;
78+
private final String reasonCancelled;
7679

77-
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries) {
80+
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
81+
@Nullable String reasonCancelled) {
7882
this.total = checkPositive(total, "total");
7983
this.updated = checkPositive(updated, "updated");
8084
this.created = checkPositive(created, "created");
@@ -83,6 +87,7 @@ public Status(long total, long updated, long created, long deleted, int batches,
8387
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
8488
this.noops = checkPositive(noops, "noops");
8589
this.retries = checkPositive(retries, "retries");
90+
this.reasonCancelled = reasonCancelled;
8691
}
8792

8893
public Status(StreamInput in) throws IOException {
@@ -94,6 +99,7 @@ public Status(StreamInput in) throws IOException {
9499
versionConflicts = in.readVLong();
95100
noops = in.readVLong();
96101
retries = in.readVLong();
102+
reasonCancelled = in.readOptionalString();
97103
}
98104

99105
@Override
@@ -106,6 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {
106112
out.writeVLong(versionConflicts);
107113
out.writeVLong(noops);
108114
out.writeVLong(retries);
115+
out.writeOptionalString(reasonCancelled);
109116
}
110117

111118
@Override
@@ -129,6 +136,9 @@ public XContentBuilder innerXContent(XContentBuilder builder, Params params, boo
129136
builder.field("version_conflicts", versionConflicts);
130137
builder.field("noops", noops);
131138
builder.field("retries", retries);
139+
if (reasonCancelled != null) {
140+
builder.field("canceled", reasonCancelled);
141+
}
132142
return builder;
133143
}
134144

@@ -152,6 +162,9 @@ public void innerToString(StringBuilder builder, boolean includeCreated, boolean
152162
builder.append(",versionConflicts=").append(versionConflicts);
153163
builder.append(",noops=").append(noops);
154164
builder.append(",retries=").append(retries);
165+
if (reasonCancelled != null) {
166+
builder.append(",canceled=").append(reasonCancelled);
167+
}
155168
}
156169

157170
@Override
@@ -221,6 +234,13 @@ public long getRetries() {
221234
return retries;
222235
}
223236

237+
/**
238+
* The reason that the request was canceled or null if it hasn't been.
239+
*/
240+
public String getReasonCancelled() {
241+
return reasonCancelled;
242+
}
243+
224244
private int checkPositive(int value, String name) {
225245
if (value < 0) {
226246
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");

plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkIndexByScrollResponse.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ public long getNoops() {
8181
return status.getNoops();
8282
}
8383

84+
/**
85+
* The reason that the request was canceled or null if it hasn't been.
86+
*/
87+
public String getReasonCancelled() {
88+
return status.getReasonCancelled();
89+
}
90+
8491
/**
8592
* All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the
8693
* default).

plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AbstractBulkIndexByScrollResponseMatcher.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.hamcrest.TypeSafeMatcher;
2525

2626
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.nullValue;
2728

2829
public abstract class AbstractBulkIndexByScrollResponseMatcher<
2930
Response extends BulkIndexByScrollResponse,
@@ -36,6 +37,7 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
3637
private Matcher<Integer> batchesMatcher;
3738
private Matcher<Long> versionConflictsMatcher = equalTo(0L);
3839
private Matcher<Integer> failuresMatcher = equalTo(0);
40+
private Matcher<String> reasonCancelledMatcher = nullValue(String.class);
3941

4042
protected abstract Self self();
4143

@@ -93,13 +95,18 @@ public Self failures(int failures) {
9395
return failures(equalTo(failures));
9496
}
9597

98+
public Self reasonCancelled(Matcher<String> reasonCancelledMatcher) {
99+
this.reasonCancelledMatcher = reasonCancelledMatcher;
100+
return self();
101+
}
96102

97103
@Override
98104
protected boolean matchesSafely(Response item) {
99105
return updatedMatcher.matches(item.getUpdated()) &&
100106
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
101107
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
102-
failuresMatcher.matches(item.getIndexingFailures().size());
108+
failuresMatcher.matches(item.getIndexingFailures().size()) &&
109+
reasonCancelledMatcher.matches(item.getReasonCancelled());
103110
}
104111

105112
@Override
@@ -110,5 +117,6 @@ public void describeTo(Description description) {
110117
}
111118
description.appendText(" and versionConflicts matches ").appendDescriptionOf(versionConflictsMatcher);
112119
description.appendText(" and failures size matches ").appendDescriptionOf(failuresMatcher);
120+
description.appendText(" and reason cancelled matches ").appendDescriptionOf(reasonCancelledMatcher);
113121
}
114122
}

0 commit comments

Comments
 (0)