Skip to content

Commit 0642c73

Browse files
authored
Add support for task cancellation to RestNodesStatsAction (#71897)
1 parent c7fb400 commit 0642c73

File tree

4 files changed

+42
-1
lines changed

4 files changed

+42
-1
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
13+
import org.elasticsearch.client.Request;
14+
15+
public class NodeStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
16+
public void testNodeStatsRestCancellation() throws Exception {
17+
runTest(new Request(HttpGet.METHOD_NAME, "/_nodes/stats"), NodesStatsAction.NAME);
18+
}
19+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@
1212
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.tasks.CancellableTask;
16+
import org.elasticsearch.tasks.Task;
17+
import org.elasticsearch.tasks.TaskId;
18+
1519
import java.io.IOException;
1620
import java.util.Arrays;
1721
import java.util.HashSet;
22+
import java.util.Map;
1823
import java.util.Set;
1924
import java.util.SortedSet;
2025
import java.util.TreeSet;
@@ -154,6 +159,11 @@ private void optionallyAddMetric(boolean includeMetric, String metricName) {
154159
}
155160
}
156161

162+
@Override
163+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
164+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
165+
}
166+
157167
@Override
158168
public void writeTo(StreamOutput out) throws IOException {
159169
super.writeTo(out);

server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
1818
import org.elasticsearch.node.NodeService;
19+
import org.elasticsearch.tasks.CancellableTask;
1920
import org.elasticsearch.tasks.Task;
21+
import org.elasticsearch.tasks.TaskId;
2022
import org.elasticsearch.threadpool.ThreadPool;
2123
import org.elasticsearch.transport.TransportRequest;
2224
import org.elasticsearch.transport.TransportService;
2325

2426
import java.io.IOException;
2527
import java.util.List;
28+
import java.util.Map;
2629
import java.util.Set;
2730

2831
public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRequest,
@@ -57,6 +60,8 @@ protected NodeStats newNodeResponse(StreamInput in) throws IOException {
5760

5861
@Override
5962
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) {
63+
assert task instanceof CancellableTask;
64+
6065
NodesStatsRequest request = nodeStatsRequest.request;
6166
Set<String> metrics = request.requestedMetrics();
6267
return nodeService.stats(
@@ -90,6 +95,11 @@ public NodeStatsRequest(StreamInput in) throws IOException {
9095
this.request = request;
9196
}
9297

98+
@Override
99+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
100+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
101+
}
102+
93103
@Override
94104
public void writeTo(StreamOutput out) throws IOException {
95105
super.writeTo(out);

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.rest.BaseRestHandler;
1717
import org.elasticsearch.rest.RestRequest;
1818
import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
19+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1920

2021
import java.io.IOException;
2122
import java.util.Collections;
@@ -162,7 +163,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
162163
nodesStatsRequest.indices().includeUnloadedSegments(request.paramAsBoolean("include_unloaded_segments", false));
163164
}
164165

165-
return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
166+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
167+
.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
166168
}
167169

168170
private final Set<String> RESPONSE_PARAMS = Collections.singleton("level");

0 commit comments

Comments
 (0)