Skip to content

Commit eb6afe4

Browse files
authored
Add support for task cancellation to RestNodesStatsAction (#71907)
Backport #71897
1 parent afcb3a1 commit eb6afe4

File tree

4 files changed

+47
-1
lines changed

4 files changed

+47
-1
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
13+
import org.elasticsearch.client.Request;
14+
15+
public class NodeStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
16+
public void testNodeStatsRestCancellation() throws Exception {
17+
runTest(new Request(HttpGet.METHOD_NAME, "/_nodes/stats"), NodesStatsAction.NAME);
18+
}
19+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@
1313
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
19+
1620
import java.io.IOException;
1721
import java.util.Arrays;
1822
import java.util.HashSet;
23+
import java.util.Map;
1924
import java.util.Set;
2025
import java.util.SortedSet;
2126
import java.util.TreeSet;
@@ -172,6 +177,11 @@ private void optionallyAddMetric(boolean includeMetric, String metricName) {
172177
}
173178
}
174179

180+
@Override
181+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
182+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
183+
}
184+
175185
@Override
176186
public void writeTo(StreamOutput out) throws IOException {
177187
super.writeTo(out);

server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.node.NodeService;
20+
import org.elasticsearch.tasks.CancellableTask;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
2023
import org.elasticsearch.threadpool.ThreadPool;
2124
import org.elasticsearch.transport.TransportService;
2225

2326
import java.io.IOException;
2427
import java.util.List;
28+
import java.util.Map;
2529
import java.util.Set;
2630

2731
public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRequest,
@@ -54,6 +58,12 @@ protected NodeStats newNodeResponse(StreamInput in) throws IOException {
5458
return new NodeStats(in);
5559
}
5660

61+
@Override
62+
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) {
63+
assert task instanceof CancellableTask;
64+
return nodeOperation(nodeStatsRequest);
65+
}
66+
5767
@Override
5868
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
5969
NodesStatsRequest request = nodeStatsRequest.request;
@@ -89,6 +99,11 @@ public NodeStatsRequest(StreamInput in) throws IOException {
8999
this.request = request;
90100
}
91101

102+
@Override
103+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
104+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
105+
}
106+
92107
@Override
93108
public void writeTo(StreamOutput out) throws IOException {
94109
super.writeTo(out);

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.rest.BaseRestHandler;
1717
import org.elasticsearch.rest.RestRequest;
1818
import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
19+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1920

2021
import java.io.IOException;
2122
import java.util.Collections;
@@ -167,7 +168,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
167168
nodesStatsRequest.indices().includeUnloadedSegments(request.paramAsBoolean("include_unloaded_segments", false));
168169
}
169170

170-
return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
171+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
172+
.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
171173
}
172174

173175
private final Set<String> RESPONSE_PARAMS = Collections.singleton("level");

0 commit comments

Comments
 (0)