Skip to content

Commit 30da196

Browse files
Make GetSnapshotsAction Cancellable (#72644) (#73820)
If this runs needlessly for large repositories (especially in timeout/retry situations) it's a significant memory+cpu hit => made it cancellable like we recently did for many other endpoints.
1 parent feae8e9 commit 30da196

File tree

5 files changed

+130
-7
lines changed

5 files changed

+130
-7
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsAction;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.client.Cancellable;
15+
import org.elasticsearch.client.Request;
16+
import org.elasticsearch.client.Response;
17+
import org.elasticsearch.client.ResponseListener;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.CollectionUtils;
20+
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
22+
import org.elasticsearch.snapshots.SnapshotState;
23+
import org.elasticsearch.snapshots.mockstore.MockRepository;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
26+
import java.util.Collection;
27+
import java.util.concurrent.CancellationException;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
31+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
32+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
33+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
34+
import static org.hamcrest.core.IsEqual.equalTo;
35+
36+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
37+
public class RestGetSnapshotsCancellationIT extends HttpSmokeTestCase {
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> nodePlugins() {
41+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockRepository.Plugin.class);
42+
}
43+
44+
public void testGetSnapshotsCancellation() throws Exception {
45+
internalCluster().startMasterOnlyNode();
46+
internalCluster().startDataOnlyNode();
47+
ensureStableCluster(2);
48+
49+
final String repoName = "test-repo";
50+
assertAcked(
51+
client().admin().cluster().preparePutRepository(repoName)
52+
.setType("mock").setSettings(Settings.builder().put("location", randomRepoPath())));
53+
54+
final int snapshotCount = randomIntBetween(1, 5);
55+
for (int i = 0; i < snapshotCount; i++) {
56+
assertEquals(
57+
SnapshotState.SUCCESS,
58+
client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-" + i).setWaitForCompletion(true)
59+
.get().getSnapshotInfo().state()
60+
);
61+
}
62+
63+
final MockRepository repository = AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName);
64+
repository.setBlockOnAnyFiles();
65+
66+
final Request request = new Request(HttpGet.METHOD_NAME, "/_snapshot/" + repoName + "/*");
67+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
68+
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
69+
@Override
70+
public void onSuccess(Response response) {
71+
future.onResponse(null);
72+
}
73+
74+
@Override
75+
public void onFailure(Exception exception) {
76+
future.onFailure(exception);
77+
}
78+
});
79+
80+
assertThat(future.isDone(), equalTo(false));
81+
awaitTaskWithPrefix(GetSnapshotsAction.NAME);
82+
assertBusy(() -> assertTrue(repository.blocked()), 30L, TimeUnit.SECONDS);
83+
cancellable.cancel();
84+
assertAllCancellableTasksAreCancelled(GetSnapshotsAction.NAME);
85+
repository.unblock();
86+
expectThrows(CancellationException.class, future::actionGet);
87+
88+
assertAllTasksHaveFinished(GetSnapshotsAction.NAME);
89+
}
90+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
1619

1720
import java.io.IOException;
21+
import java.util.Map;
1822

1923
import static org.elasticsearch.action.ValidateActions.addValidationError;
2024

@@ -160,4 +164,9 @@ public GetSnapshotsRequest verbose(boolean verbose) {
160164
public boolean verbose() {
161165
return verbose;
162166
}
167+
168+
@Override
169+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
170+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
171+
}
163172
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.elasticsearch.snapshots.SnapshotInfo;
3636
import org.elasticsearch.snapshots.SnapshotMissingException;
3737
import org.elasticsearch.snapshots.SnapshotsService;
38+
import org.elasticsearch.tasks.CancellableTask;
39+
import org.elasticsearch.tasks.Task;
40+
import org.elasticsearch.tasks.TaskCancelledException;
3841
import org.elasticsearch.threadpool.ThreadPool;
3942
import org.elasticsearch.transport.TransportService;
4043

@@ -73,11 +76,19 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
7376
}
7477

7578
@Override
76-
protected void masterOperation(final GetSnapshotsRequest request, final ClusterState state,
79+
protected void masterOperation(GetSnapshotsRequest request, ClusterState state,
80+
ActionListener<GetSnapshotsResponse> listener) throws Exception {
81+
throw new UnsupportedOperationException("The task parameter is required");
82+
}
83+
84+
@Override
85+
protected void masterOperation(final Task task, final GetSnapshotsRequest request, final ClusterState state,
7786
final ActionListener<GetSnapshotsResponse> listener) {
7887
final String repo = request.repository();
7988
final String[] snapshots = request.snapshots();
8089
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
90+
assert task instanceof CancellableTask : task + " not cancellable";
91+
8192
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
8293
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
8394
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo)) {
@@ -94,7 +105,7 @@ protected void masterOperation(final GetSnapshotsRequest request, final ClusterS
94105
}
95106

96107
repositoryDataListener.whenComplete(repositoryData -> loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
97-
request.ignoreUnavailable(), request.verbose(), allSnapshotIds, currentSnapshots, repositoryData,
108+
request.ignoreUnavailable(), request.verbose(), allSnapshotIds, currentSnapshots, repositoryData, (CancellableTask) task,
98109
listener.map(GetSnapshotsResponse::new)), listener::onFailure);
99110
}
100111

@@ -120,7 +131,12 @@ private static List<SnapshotInfo> sortedCurrentSnapshots(SnapshotsInProgress sna
120131
private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
121132
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
122133
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData,
123-
ActionListener<List<SnapshotInfo>> listener) {
134+
CancellableTask task, ActionListener<List<SnapshotInfo>> listener) {
135+
if (task.isCancelled()) {
136+
listener.onFailure(new TaskCancelledException("task cancelled"));
137+
return;
138+
}
139+
124140
if (repositoryData != null) {
125141
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
126142
allSnapshotIds.put(snapshotId.getName(), snapshotId);
@@ -156,7 +172,7 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
156172

157173
if (verbose) {
158174
threadPool.generic().execute(ActionRunnable.supply(
159-
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable)));
175+
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable, task)));
160176
} else {
161177
final List<SnapshotInfo> snapshotInfos;
162178
if (repositoryData != null) {
@@ -182,7 +198,10 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
182198
* @return list of snapshots
183199
*/
184200
private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName,
185-
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
201+
List<SnapshotId> snapshotIds, boolean ignoreUnavailable, CancellableTask task) {
202+
if (task.isCancelled()) {
203+
throw new TaskCancelledException("task cancelled");
204+
}
186205
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
187206
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
188207
// first, look at the snapshots in progress
@@ -196,6 +215,9 @@ private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, St
196215
// then, look in the repository
197216
final Repository repository = repositoriesService.repository(repositoryName);
198217
for (SnapshotId snapshotId : snapshotIdsToIterate) {
218+
if (task.isCancelled()) {
219+
throw new TaskCancelledException("task cancelled");
220+
}
199221
try {
200222
snapshotSet.add(repository.getSnapshotInfo(snapshotId));
201223
} catch (Exception ex) {

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestGetSnapshotsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.rest.BaseRestHandler;
1515
import org.elasticsearch.rest.RestRequest;
1616
import org.elasticsearch.rest.action.RestToXContentListener;
17+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1718

1819
import java.io.IOException;
1920
import java.util.Collections;
@@ -54,6 +55,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
5455
getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
5556
getSnapshotsRequest.verbose(request.paramAsBoolean("verbose", getSnapshotsRequest.verbose()));
5657
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
57-
return channel -> client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<>(channel));
58+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin().cluster()
59+
.getSnapshots(getSnapshotsRequest, new RestToXContentListener<>(channel));
5860
}
5961
}

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public static void blockMasterFromFinalizingSnapshotOnSnapFile(final String repo
213213
}
214214

215215
@SuppressWarnings("unchecked")
216-
protected static <T extends Repository> T getRepositoryOnMaster(String repositoryName) {
216+
public static <T extends Repository> T getRepositoryOnMaster(String repositoryName) {
217217
return ((T) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repositoryName));
218218
}
219219

0 commit comments

Comments
 (0)