Skip to content

Commit eb55b48

Browse files
authored
Make GetIndexAction cancellable (#87681)
The get-indices API does some nontrivial work on the master and at high index counts the response may be very large and could take a long time to compute. Some clients will time out and retry if it takes too long. Today this API is not properly cancellable which leads to a good deal of wasted work in this situation, and the potentially-enormous response is serialized on a transport worker thread. With this commit we make the API cancellable and move the serialization to a `MANAGEMENT` thread. Relates #77466
1 parent eb8c4ba commit eb55b48

File tree

8 files changed

+58
-14
lines changed

8 files changed

+58
-14
lines changed

docs/changelog/87681.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 87681
2+
summary: Make `GetIndexAction` cancellable
3+
area: Indices APIs
4+
type: bug
5+
issues: []

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestGetMappingsCancellationIT.java renamed to qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.http;
1010

1111
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
1213
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
1314
import org.elasticsearch.action.support.PlainActionFuture;
1415
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -37,21 +38,28 @@
3738
import static org.hamcrest.core.IsEqual.equalTo;
3839

3940
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
40-
public class RestGetMappingsCancellationIT extends HttpSmokeTestCase {
41+
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
4142

4243
public void testGetMappingsCancellation() throws Exception {
44+
runTest(GetMappingsAction.NAME, "/test/_mappings");
45+
}
46+
47+
public void testGetIndicesCancellation() throws Exception {
48+
runTest(GetIndexAction.NAME, "/test");
49+
}
50+
51+
private void runTest(String actionName, String endpoint) throws Exception {
4352
internalCluster().startMasterOnlyNode();
4453
internalCluster().startDataOnlyNode();
4554
ensureStableCluster(2);
4655

4756
createIndex("test");
4857
ensureGreen("test");
49-
final String actionName = GetMappingsAction.NAME;
5058
// Add a retryable cluster block that would block the request execution
5159
updateClusterState(currentState -> {
5260
ClusterBlock clusterBlock = new ClusterBlock(
5361
1000,
54-
"Get mappings cancellation test cluster block",
62+
actionName + " cancellation test cluster block",
5563
true,
5664
false,
5765
false,
@@ -62,7 +70,7 @@ public void testGetMappingsCancellation() throws Exception {
6270
return ClusterState.builder(currentState).blocks(ClusterBlocks.builder().addGlobalBlock(clusterBlock).build()).build();
6371
});
6472

65-
final Request request = new Request(HttpGet.METHOD_NAME, "/test/_mappings");
73+
final Request request = new Request(HttpGet.METHOD_NAME, endpoint);
6674
final PlainActionFuture<Response> future = new PlainActionFuture<>();
6775
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
6876

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
734734
registerHandler.accept(new RestResetFeatureStateAction());
735735
registerHandler.accept(new RestGetFeatureUpgradeStatusAction());
736736
registerHandler.accept(new RestPostFeatureUpgradeAction());
737-
registerHandler.accept(new RestGetIndicesAction());
737+
registerHandler.accept(new RestGetIndicesAction(threadPool));
738738
registerHandler.accept(new RestIndicesStatsAction());
739739
registerHandler.accept(new RestIndicesSegmentsAction(threadPool));
740740
registerHandler.accept(new RestIndicesShardStoresAction());

server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.util.ArrayUtils;
1616
import org.elasticsearch.rest.RestRequest;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1720

1821
import java.io.IOException;
1922
import java.util.ArrayList;
2023
import java.util.HashSet;
2124
import java.util.List;
2225
import java.util.Locale;
26+
import java.util.Map;
2327
import java.util.Set;
2428

2529
/**
@@ -161,4 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
161165
out.writeBoolean(humanReadable);
162166
out.writeBoolean(includeDefaults);
163167
}
168+
169+
@Override
170+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
171+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
172+
}
164173
}

server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1919
import org.elasticsearch.cluster.metadata.MappingMetadata;
20-
import org.elasticsearch.cluster.metadata.Metadata;
2120
import org.elasticsearch.cluster.service.ClusterService;
2221
import org.elasticsearch.common.collect.ImmutableOpenMap;
2322
import org.elasticsearch.common.inject.Inject;
2423
import org.elasticsearch.common.settings.IndexScopedSettings;
2524
import org.elasticsearch.common.settings.Settings;
2625
import org.elasticsearch.common.settings.SettingsFilter;
2726
import org.elasticsearch.indices.IndicesService;
27+
import org.elasticsearch.tasks.CancellableTask;
2828
import org.elasticsearch.tasks.Task;
2929
import org.elasticsearch.threadpool.ThreadPool;
3030
import org.elasticsearch.transport.TransportService;
@@ -94,11 +94,12 @@ protected void doMasterOperation(
9494
boolean doneMappings = false;
9595
boolean doneSettings = false;
9696
for (Feature feature : features) {
97+
checkCancellation(task);
9798
switch (feature) {
9899
case MAPPINGS:
99100
if (doneMappings == false) {
100101
mappingsResult = state.metadata()
101-
.findMappings(concreteIndices, indicesService.getFieldFilter(), Metadata.ON_NEXT_INDEX_FIND_MAPPINGS_NOOP);
102+
.findMappings(concreteIndices, indicesService.getFieldFilter(), () -> checkCancellation(task));
102103
doneMappings = true;
103104
}
104105
break;
@@ -113,6 +114,7 @@ protected void doMasterOperation(
113114
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
114115
ImmutableOpenMap.Builder<String, Settings> defaultSettingsMapBuilder = ImmutableOpenMap.builder();
115116
for (String index : concreteIndices) {
117+
checkCancellation(task);
116118
Settings indexSettings = state.metadata().index(index).getSettings();
117119
if (request.humanReadable()) {
118120
indexSettings = IndexMetadata.addHumanReadableSettings(indexSettings);
@@ -137,4 +139,10 @@ protected void doMasterOperation(
137139
}
138140
listener.onResponse(new GetIndexResponse(concreteIndices, mappingsResult, aliasesResult, settings, defaultSettings, dataStreams));
139141
}
142+
143+
private static void checkCancellation(Task task) {
144+
if (task instanceof CancellableTask cancellableTask) {
145+
cancellableTask.ensureNotCancelled();
146+
}
147+
}
140148
}

server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.transport.TransportService;
2727

2828
import java.util.Map;
29-
import java.util.concurrent.CancellationException;
3029

3130
public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMappingsRequest, GetMappingsResponse> {
3231

@@ -75,8 +74,8 @@ protected void doMasterOperation(
7574
}
7675

7776
private static void checkCancellation(Task task) {
78-
if (task instanceof CancellableTask && ((CancellableTask) task).isCancelled()) {
79-
throw new CancellationException("Task cancelled");
77+
if (task instanceof CancellableTask cancellableTask) {
78+
cancellableTask.ensureNotCancelled();
8079
}
8180
}
8281
}

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import org.elasticsearch.core.RestApiVersion;
1818
import org.elasticsearch.rest.BaseRestHandler;
1919
import org.elasticsearch.rest.RestRequest;
20-
import org.elasticsearch.rest.action.RestToXContentListener;
20+
import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
22+
import org.elasticsearch.threadpool.ThreadPool;
2123

2224
import java.io.IOException;
2325
import java.util.Collections;
@@ -42,6 +44,12 @@ public class RestGetIndicesAction extends BaseRestHandler {
4244
.collect(Collectors.toSet())
4345
);
4446

47+
private final ThreadPool threadPool;
48+
49+
public RestGetIndicesAction(ThreadPool threadPool) {
50+
this.threadPool = threadPool;
51+
}
52+
4553
@Override
4654
public List<Route> routes() {
4755
return List.of(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"));
@@ -70,7 +78,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
7078
getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
7179
getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
7280
getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));
73-
return channel -> client.admin().indices().getIndex(getIndexRequest, new RestToXContentListener<>(channel));
81+
final var httpChannel = request.getHttpChannel();
82+
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
83+
.indices()
84+
.getIndex(
85+
getIndexRequest,
86+
new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request)
87+
);
7488
}
7589

7690
/**

server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.rest.action.admin.indices;
1010

1111
import org.elasticsearch.client.internal.node.NodeClient;
12+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1213
import org.elasticsearch.core.RestApiVersion;
1314
import org.elasticsearch.rest.RestRequest;
1415
import org.elasticsearch.test.ESTestCase;
@@ -36,7 +37,7 @@ public void testIncludeTypeNamesWarning() throws IOException {
3637
Map.of("Content-Type", contentTypeHeader, "Accept", contentTypeHeader)
3738
).withMethod(RestRequest.Method.GET).withPath("/some_index").withParams(params).build();
3839

39-
RestGetIndicesAction handler = new RestGetIndicesAction();
40+
RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
4041
handler.prepareRequest(request, mock(NodeClient.class));
4142
assertCriticalWarnings(RestGetIndicesAction.TYPES_DEPRECATION_MESSAGE);
4243

@@ -57,7 +58,7 @@ public void testIncludeTypeNamesWarningExists() throws IOException {
5758
Map.of("Content-Type", contentTypeHeader, "Accept", contentTypeHeader)
5859
).withMethod(RestRequest.Method.HEAD).withPath("/some_index").withParams(params).build();
5960

60-
RestGetIndicesAction handler = new RestGetIndicesAction();
61+
RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
6162
handler.prepareRequest(request, mock(NodeClient.class));
6263
}
6364
}

0 commit comments

Comments
 (0)