Skip to content

Commit 9262389

Browse files
committed
Make recovery APIs cancellable (#69177)
Relates #55550
1 parent 47f169b commit 9262389

File tree

6 files changed

+178
-2
lines changed

6 files changed

+178
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
13+
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
14+
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryActionHelper;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.client.Cancellable;
17+
import org.elasticsearch.client.Request;
18+
import org.elasticsearch.client.Response;
19+
import org.elasticsearch.client.ResponseListener;
20+
import org.elasticsearch.common.lease.Releasable;
21+
import org.elasticsearch.common.lease.Releasables;
22+
import org.elasticsearch.tasks.CancellableTask;
23+
import org.elasticsearch.tasks.TaskInfo;
24+
import org.elasticsearch.transport.TransportService;
25+
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.concurrent.CancellationException;
29+
import java.util.concurrent.Semaphore;
30+
31+
import static org.hamcrest.Matchers.empty;
32+
import static org.hamcrest.Matchers.not;
33+
34+
public class IndicesRecoveryRestCancellationIT extends HttpSmokeTestCase {
35+
36+
public void testIndicesRecoveryRestCancellation() throws Exception {
37+
runTest(new Request(HttpGet.METHOD_NAME, "/_recovery"));
38+
}
39+
40+
public void testCatRecoveryRestCancellation() throws Exception {
41+
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/recovery"));
42+
}
43+
44+
private void runTest(Request request) throws Exception {
45+
46+
createIndex("test");
47+
ensureGreen("test");
48+
49+
final List<Semaphore> operationBlocks = new ArrayList<>();
50+
for (final TransportRecoveryAction transportRecoveryAction : internalCluster().getInstances(TransportRecoveryAction.class)) {
51+
final Semaphore operationBlock = new Semaphore(1);
52+
operationBlocks.add(operationBlock);
53+
TransportRecoveryActionHelper.setOnShardOperation(transportRecoveryAction, () -> {
54+
try {
55+
operationBlock.acquire();
56+
} catch (InterruptedException e) {
57+
throw new AssertionError(e);
58+
}
59+
operationBlock.release();
60+
});
61+
}
62+
assertThat(operationBlocks, not(empty()));
63+
64+
final List<Releasable> releasables = new ArrayList<>();
65+
try {
66+
for (final Semaphore operationBlock : operationBlocks) {
67+
operationBlock.acquire();
68+
releasables.add(operationBlock::release);
69+
}
70+
71+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
72+
logger.info("--> sending request");
73+
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
74+
@Override
75+
public void onSuccess(Response response) {
76+
future.onResponse(null);
77+
}
78+
79+
@Override
80+
public void onFailure(Exception exception) {
81+
future.onFailure(exception);
82+
}
83+
});
84+
85+
logger.info("--> waiting for task to start");
86+
assertBusy(() -> {
87+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
88+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
89+
});
90+
91+
logger.info("--> waiting for at least one task to hit a block");
92+
assertBusy(() -> assertTrue(operationBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
93+
94+
logger.info("--> cancelling request");
95+
cancellable.cancel();
96+
expectThrows(CancellationException.class, future::actionGet);
97+
98+
logger.info("--> checking that all tasks are marked as cancelled");
99+
assertBusy(() -> {
100+
boolean foundTask = false;
101+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
102+
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
103+
if (cancellableTask.getAction().startsWith(RecoveryAction.NAME)) {
104+
foundTask = true;
105+
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
106+
}
107+
}
108+
}
109+
assertTrue("found no cancellable tasks", foundTask);
110+
});
111+
} finally {
112+
Releasables.close(releasables);
113+
}
114+
115+
logger.info("--> checking that all tasks have finished");
116+
assertBusy(() -> {
117+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
118+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
119+
});
120+
}
121+
122+
}

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java

+9
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
1619

1720
import java.io.IOException;
21+
import java.util.Map;
1822

1923
/**
2024
* Request for recovery information
@@ -90,4 +94,9 @@ public void writeTo(StreamOutput out) throws IOException {
9094
out.writeBoolean(detailed);
9195
out.writeBoolean(activeOnly);
9296
}
97+
98+
@Override
99+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
100+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
101+
}
93102
}

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

+19
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.elasticsearch.cluster.routing.ShardRouting;
1919
import org.elasticsearch.cluster.routing.ShardsIterator;
2020
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.Nullable;
2122
import org.elasticsearch.common.inject.Inject;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.index.IndexService;
2425
import org.elasticsearch.index.shard.IndexShard;
2526
import org.elasticsearch.indices.IndicesService;
2627
import org.elasticsearch.indices.recovery.RecoveryState;
28+
import org.elasticsearch.tasks.CancellableTask;
2729
import org.elasticsearch.tasks.Task;
2830
import org.elasticsearch.threadpool.ThreadPool;
2931
import org.elasticsearch.transport.TransportService;
@@ -88,6 +90,8 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
8890

8991
@Override
9092
protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task) {
93+
assert task instanceof CancellableTask;
94+
runOnShardOperation();
9195
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
9296
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
9397
return indexShard.recoveryState();
@@ -107,4 +111,19 @@ protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryReq
107111
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
108112
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
109113
}
114+
115+
@Nullable // unless running tests that inject extra behaviour
116+
private volatile Runnable onShardOperation;
117+
118+
private void runOnShardOperation() {
119+
final Runnable onShardOperation = this.onShardOperation;
120+
if (onShardOperation != null) {
121+
onShardOperation.run();
122+
}
123+
}
124+
125+
// exposed for tests: inject some extra behaviour that runs when shardOperation() is called
126+
void setOnShardOperation(@Nullable Runnable onShardOperation) {
127+
this.onShardOperation = onShardOperation;
128+
}
110129
}

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRecoveryAction.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.rest.BaseRestHandler;
1616
import org.elasticsearch.rest.RestRequest;
17+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1718
import org.elasticsearch.rest.action.RestToXContentListener;
1819

1920
import java.io.IOException;
@@ -52,7 +53,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
5253
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
5354
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
5455
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
55-
return channel -> client.admin().indices().recoveries(recoveryRequest, new RestToXContentListener<>(channel));
56+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
57+
.admin().indices().recoveries(recoveryRequest, new RestToXContentListener<>(channel));
5658
}
5759
}
5860

server/src/main/java/org/elasticsearch/rest/action/cat/RestCatRecoveryAction.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.indices.recovery.RecoveryState;
2323
import org.elasticsearch.rest.RestRequest;
2424
import org.elasticsearch.rest.RestResponse;
25+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2526
import org.elasticsearch.rest.action.RestResponseListener;
2627

2728
import java.util.Comparator;
@@ -64,7 +65,8 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
6465
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
6566
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
6667

67-
return channel -> client.admin().indices().recoveries(recoveryRequest, new RestResponseListener<RecoveryResponse>(channel) {
68+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
69+
.admin().indices().recoveries(recoveryRequest, new RestResponseListener<RecoveryResponse>(channel) {
6870
@Override
6971
public RestResponse buildResponse(final RecoveryResponse response) throws Exception {
7072
return RestTable.buildResponse(buildRecoveryTable(request, response), channel);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.admin.indices.recovery;
10+
11+
/**
12+
* Helper methods for {@link TransportRecoveryAction}.
13+
*/
14+
public class TransportRecoveryActionHelper {
15+
16+
/**
17+
* Helper method for tests to call {@link TransportRecoveryAction#setOnShardOperation}.
18+
*/
19+
public static void setOnShardOperation(TransportRecoveryAction transportRecoveryAction, Runnable setOnShardOperation) {
20+
transportRecoveryAction.setOnShardOperation(setOnShardOperation);
21+
}
22+
}

0 commit comments

Comments
 (0)