Skip to content

Commit fcd8a43

Browse files
committed
Submit _async search task should cancel children on cancellation (#58332)
This change allows the submit async search task to cancel children and removes the manual indirection that cancels the search task when the submit task is cancelled. This is now handled by the task cancellation, which can cancel grand-children since #54757.
1 parent 88f1dab commit fcd8a43

File tree

4 files changed

+37
-59
lines changed

4 files changed

+37
-59
lines changed

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.Map;
3737
import java.util.concurrent.atomic.AtomicBoolean;
3838
import java.util.concurrent.atomic.AtomicReference;
39-
import java.util.function.BooleanSupplier;
4039
import java.util.function.Consumer;
4140
import java.util.function.Supplier;
4241

@@ -46,7 +45,6 @@
4645
* Task that tracks the progress of a currently running {@link SearchRequest}.
4746
*/
4847
final class AsyncSearchTask extends SearchTask implements AsyncTask {
49-
private final BooleanSupplier checkSubmitCancellation;
5048
private final AsyncExecutionId searchId;
5149
private final Client client;
5250
private final ThreadPool threadPool;
@@ -73,7 +71,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
7371
* @param type The type of the task.
7472
* @param action The action name.
7573
* @param parentTaskId The parent task id.
76-
* @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled.
7774
* @param originHeaders All the request context headers.
7875
* @param taskHeaders The filtered request headers for the task.
7976
* @param searchId The {@link AsyncExecutionId} of the task.
@@ -84,7 +81,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
8481
String type,
8582
String action,
8683
TaskId parentTaskId,
87-
BooleanSupplier checkSubmitCancellation,
8884
TimeValue keepAlive,
8985
Map<String, String> originHeaders,
9086
Map<String, String> taskHeaders,
@@ -93,7 +89,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
9389
ThreadPool threadPool,
9490
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
9591
super(id, type, action, "async_search", parentTaskId, taskHeaders);
96-
this.checkSubmitCancellation = checkSubmitCancellation;
9792
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
9893
this.originHeaders = originHeaders;
9994
this.searchId = searchId;
@@ -319,12 +314,9 @@ private AsyncSearchResponse getResponseWithHeaders() {
319314
// checks if the search task should be cancelled
320315
private synchronized void checkCancellation() {
321316
long now = System.currentTimeMillis();
322-
if (hasCompleted == false &&
323-
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
324-
// we cancel the search task if the initial submit task was cancelled,
325-
// this is needed because the task cancellation mechanism doesn't
326-
// handle the cancellation of grand-children.
327-
cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired");
317+
if (hasCompleted == false && expirationTimeMillis < now) {
318+
// we cancel expired search task even if they are still running
319+
cancelTask(() -> {}, "async search has expired");
328320
}
329321
}
330322

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import org.elasticsearch.index.engine.VersionConflictEngineException;
3030
import org.elasticsearch.search.SearchService;
3131
import org.elasticsearch.search.aggregations.InternalAggregation;
32-
import org.elasticsearch.tasks.CancellableTask;
3332
import org.elasticsearch.tasks.Task;
34-
import org.elasticsearch.tasks.TaskCancelledException;
3533
import org.elasticsearch.tasks.TaskId;
3634
import org.elasticsearch.transport.TransportService;
3735
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
@@ -74,8 +72,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
7472
}
7573

7674
@Override
77-
protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
78-
CancellableTask submitTask = (CancellableTask) task;
75+
protected void doExecute(Task submitTask, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
7976
final SearchRequest searchRequest = createSearchRequest(request, submitTask, request.getKeepAlive());
8077
AsyncSearchTask searchTask = (AsyncSearchTask) taskManager.register("transport", SearchAction.INSTANCE.name(), searchRequest);
8178
searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
@@ -87,42 +84,34 @@ public void onResponse(AsyncSearchResponse searchResponse) {
8784
// the task is still running and the user cannot wait more so we create
8885
// a document for further retrieval
8986
try {
90-
if (submitTask.isCancelled()) {
91-
// the user cancelled the submit so we don't store anything
92-
// and propagate the failure
93-
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
94-
onFatalFailure(searchTask, cause, searchResponse.isRunning(),
95-
"submit task is cancelled", submitListener);
96-
} else {
97-
final String docId = searchTask.getExecutionId().getDocId();
98-
// creates the fallback response if the node crashes/restarts in the middle of the request
99-
// TODO: store intermediate results ?
100-
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
101-
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
102-
new ActionListener<IndexResponse>() {
103-
@Override
104-
public void onResponse(IndexResponse r) {
105-
if (searchResponse.isRunning()) {
106-
try {
107-
// store the final response on completion unless the submit is cancelled
108-
searchTask.addCompletionListener(finalResponse ->
109-
onFinalResponse(submitTask, searchTask, finalResponse, () -> {}));
110-
} finally {
111-
submitListener.onResponse(searchResponse);
112-
}
113-
} else {
114-
onFinalResponse(submitTask, searchTask, searchResponse,
115-
() -> submitListener.onResponse(searchResponse));
87+
final String docId = searchTask.getExecutionId().getDocId();
88+
// creates the fallback response if the node crashes/restarts in the middle of the request
89+
// TODO: store intermediate results ?
90+
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
91+
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
92+
new ActionListener<IndexResponse>() {
93+
@Override
94+
public void onResponse(IndexResponse r) {
95+
if (searchResponse.isRunning()) {
96+
try {
97+
// store the final response on completion unless the submit is cancelled
98+
searchTask.addCompletionListener(finalResponse ->
99+
onFinalResponse(searchTask, finalResponse, () -> {
100+
}));
101+
} finally {
102+
submitListener.onResponse(searchResponse);
116103
}
104+
} else {
105+
onFinalResponse(searchTask, searchResponse, () -> submitListener.onResponse(searchResponse));
117106
}
107+
}
118108

119-
@Override
120-
public void onFailure(Exception exc) {
121-
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
122-
"unable to store initial response", submitListener);
123-
}
124-
});
125-
}
109+
@Override
110+
public void onFailure(Exception exc) {
111+
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
112+
"unable to store initial response", submitListener);
113+
}
114+
});
126115
} catch (Exception exc) {
127116
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener);
128117
}
@@ -141,7 +130,7 @@ public void onFailure(Exception exc) {
141130
}, request.getWaitForCompletionTimeout());
142131
}
143132

144-
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) {
133+
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, Task submitTask, TimeValue keepAlive) {
145134
String docID = UUIDs.randomBase64UUID();
146135
Map<String, String> originHeaders = nodeClient.threadPool().getThreadContext().getHeaders();
147136
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
@@ -150,9 +139,8 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
150139
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
151140
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
152141
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
153-
return new AsyncSearchTask(id, type, action, parentTaskId,
154-
submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
155-
nodeClient.threadPool(), aggReduceContextSupplier);
142+
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
143+
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
156144
}
157145
};
158146
searchRequest.setParentTask(new TaskId(nodeClient.getLocalNodeId(), submitTask.getId()));
@@ -178,11 +166,10 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
178166
}
179167
}
180168

181-
private void onFinalResponse(CancellableTask submitTask,
182-
AsyncSearchTask searchTask,
169+
private void onFinalResponse(AsyncSearchTask searchTask,
183170
AsyncSearchResponse response,
184171
Runnable nextAction) {
185-
if (submitTask.isCancelled() || searchTask.isCancelled()) {
172+
if (searchTask.isCancelled()) {
186173
// the task was cancelled so we ensure that there is nothing stored in the response index.
187174
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
188175
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ public void afterTest() {
5252
}
5353

5454
private AsyncSearchTask createAsyncSearchTask() {
55-
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
55+
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
5656
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
5757
new NoOpClient(threadPool), threadPool, null);
5858
}
5959

6060
public void testWaitForInit() throws InterruptedException {
61-
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
61+
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
6262
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
6363
new NoOpClient(threadPool), threadPool, null);
6464
int numShards = randomIntBetween(0, 10);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
154154
return new CancellableTask(id, type, action, null, parentTaskId, headers) {
155155
@Override
156156
public boolean shouldCancelChildrenOnCancellation() {
157-
// we cancel the underlying search action explicitly in the submit action
158-
return false;
157+
return true;
159158
}
160159

161160
@Override

0 commit comments

Comments
 (0)