Skip to content

Commit 563d07f

Browse files
committed
Skip cluster state serialization to closed channel (#67413)
Today if a client requests a cluster state and then closes the connection then we still do all the work of computing and serializing the cluster state before finally dropping it all on the floor. With this commit we introduce checks to make sure that the HTTP channel is still open before starting the serialization process. We also make the tasks themselves cancellable and abort any ongoing waiting if the channel is closed (mainly to make the cancellability testing easier). Finally we introduce a more detailed description of the task to help identify cases where clients are inefficiently requesting more components of the cluster state than they need.
1 parent 2a66cb6 commit 563d07f

File tree

7 files changed

+268
-11
lines changed

7 files changed

+268
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.http;
21+
22+
import org.apache.http.client.methods.HttpGet;
23+
import org.elasticsearch.Version;
24+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
25+
import org.elasticsearch.action.support.PlainActionFuture;
26+
import org.elasticsearch.client.Cancellable;
27+
import org.elasticsearch.client.Request;
28+
import org.elasticsearch.client.Response;
29+
import org.elasticsearch.client.ResponseListener;
30+
import org.elasticsearch.cluster.AbstractDiffable;
31+
import org.elasticsearch.cluster.ClusterState;
32+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
33+
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
35+
import org.elasticsearch.common.io.stream.StreamOutput;
36+
import org.elasticsearch.common.util.CollectionUtils;
37+
import org.elasticsearch.common.xcontent.XContentBuilder;
38+
import org.elasticsearch.plugins.Plugin;
39+
import org.elasticsearch.tasks.TaskInfo;
40+
41+
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.List;
44+
import java.util.concurrent.CancellationException;
45+
import java.util.function.UnaryOperator;
46+
47+
public class ClusterStateRestCancellationIT extends HttpSmokeTestCase {
48+
49+
@Override
50+
protected Collection<Class<? extends Plugin>> nodePlugins() {
51+
return CollectionUtils.appendToCopy(super.nodePlugins(), AssertingCustomPlugin.class);
52+
}
53+
54+
private void updateClusterState(ClusterService clusterService, UnaryOperator<ClusterState> updateOperator) {
55+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
56+
clusterService.submitStateUpdateTask("update state", new ClusterStateUpdateTask() {
57+
@Override
58+
public ClusterState execute(ClusterState currentState) {
59+
return updateOperator.apply(currentState);
60+
}
61+
62+
@Override
63+
public void onFailure(String source, Exception e) {
64+
throw new AssertionError(source, e);
65+
}
66+
67+
@Override
68+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
69+
future.onResponse(null);
70+
}
71+
});
72+
future.actionGet();
73+
}
74+
75+
public void testClusterStateRestCancellation() throws Exception {
76+
77+
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
78+
updateClusterState(clusterService, s -> ClusterState.builder(s).putCustom(AssertingCustom.NAME, AssertingCustom.INSTANCE).build());
79+
80+
final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state");
81+
clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE));
82+
clusterStateRequest.addParameter("wait_for_timeout", "1h");
83+
84+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
85+
logger.info("--> sending cluster state request");
86+
final Cancellable cancellable = getRestClient().performRequestAsync(clusterStateRequest, new ResponseListener() {
87+
@Override
88+
public void onSuccess(Response response) {
89+
future.onResponse(null);
90+
}
91+
92+
@Override
93+
public void onFailure(Exception exception) {
94+
future.onFailure(exception);
95+
}
96+
});
97+
98+
logger.info("--> waiting for task to start");
99+
assertBusy(() -> {
100+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
101+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
102+
});
103+
104+
logger.info("--> cancelling cluster state request");
105+
cancellable.cancel();
106+
expectThrows(CancellationException.class, future::actionGet);
107+
108+
logger.info("--> checking cluster state task completed");
109+
assertBusy(() -> {
110+
updateClusterState(clusterService, s -> ClusterState.builder(s).build());
111+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
112+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
113+
});
114+
115+
updateClusterState(clusterService, s -> ClusterState.builder(s).removeCustom(AssertingCustom.NAME).build());
116+
}
117+
118+
private static class AssertingCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
119+
120+
static final String NAME = "asserting";
121+
static final AssertingCustom INSTANCE = new AssertingCustom();
122+
123+
@Override
124+
public String getWriteableName() {
125+
return NAME;
126+
}
127+
128+
@Override
129+
public Version getMinimalSupportedVersion() {
130+
return Version.CURRENT;
131+
}
132+
133+
@Override
134+
public void writeTo(StreamOutput out) {
135+
// no content
136+
}
137+
138+
@Override
139+
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
140+
throw new AssertionError("task should have been cancelled before serializing this custom");
141+
}
142+
}
143+
144+
public static class AssertingCustomPlugin extends Plugin {
145+
@Override
146+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
147+
return Collections.singletonList(
148+
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, AssertingCustom.NAME, in -> AssertingCustom.INSTANCE));
149+
}
150+
}
151+
152+
153+
}

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.cluster.node.DiscoveryNode;
4242
import org.elasticsearch.common.Priority;
4343
import org.elasticsearch.common.Strings;
44+
import org.elasticsearch.common.network.CloseableChannel;
4445
import org.elasticsearch.common.settings.Settings;
4546
import org.elasticsearch.common.settings.SettingsFilter;
4647
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -457,6 +458,8 @@ public void sendResponse(RestResponse response) {
457458
assertThat(response.content().utf8ToString(), not(containsString("verysecretpassword")));
458459
} catch (AssertionError ex) {
459460
clusterStateError.set(ex);
461+
} finally {
462+
CloseableChannel.closeChannel(clusterStateRequest.getHttpChannel());
460463
}
461464
clusterStateLatch.countDown();
462465
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

+48
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@
2828
import org.elasticsearch.common.io.stream.StreamInput;
2929
import org.elasticsearch.common.io.stream.StreamOutput;
3030
import org.elasticsearch.common.unit.TimeValue;
31+
import org.elasticsearch.tasks.CancellableTask;
32+
import org.elasticsearch.tasks.Task;
33+
import org.elasticsearch.tasks.TaskId;
3134

3235
import java.io.IOException;
36+
import java.util.Arrays;
37+
import java.util.Map;
3338

3439
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
3540

@@ -196,4 +201,47 @@ public ClusterStateRequest waitForMetadataVersion(long waitForMetadataVersion) {
196201
this.waitForMetadataVersion = waitForMetadataVersion;
197202
return this;
198203
}
204+
205+
@Override
206+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
207+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
208+
@Override
209+
public boolean shouldCancelChildrenOnCancellation() {
210+
return true;
211+
}
212+
};
213+
}
214+
215+
@Override
216+
public String getDescription() {
217+
final StringBuilder stringBuilder = new StringBuilder("cluster state [");
218+
if (routingTable) {
219+
stringBuilder.append("routing table, ");
220+
}
221+
if (nodes) {
222+
stringBuilder.append("nodes, ");
223+
}
224+
if (metadata) {
225+
stringBuilder.append("metadata, ");
226+
}
227+
if (blocks) {
228+
stringBuilder.append("blocks, ");
229+
}
230+
if (customs) {
231+
stringBuilder.append("customs, ");
232+
}
233+
if (local) {
234+
stringBuilder.append("local, ");
235+
}
236+
if (waitForMetadataVersion != null) {
237+
stringBuilder.append("wait for metadata version [").append(waitForMetadataVersion)
238+
.append("] with timeout [").append(waitForTimeout).append("], ");
239+
}
240+
if (indices.length > 0) {
241+
stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
242+
}
243+
stringBuilder.append("master timeout [").append(masterNodeTimeout).append("]]");
244+
return stringBuilder.toString();
245+
}
246+
199247
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
import org.elasticsearch.common.unit.TimeValue;
4242
import org.elasticsearch.index.Index;
4343
import org.elasticsearch.node.NodeClosedException;
44+
import org.elasticsearch.tasks.CancellableTask;
45+
import org.elasticsearch.tasks.Task;
46+
import org.elasticsearch.tasks.TaskCancelledException;
4447
import org.elasticsearch.threadpool.ThreadPool;
4548
import org.elasticsearch.transport.TransportService;
4649

@@ -78,25 +81,38 @@ protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterS
7881
@Override
7982
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
8083
final ActionListener<ClusterStateResponse> listener) throws IOException {
84+
assert false : "task is required";
85+
throw new UnsupportedOperationException("task is required");
86+
}
87+
88+
@Override
89+
protected void masterOperation(final Task task, final ClusterStateRequest request, final ClusterState state,
90+
final ActionListener<ClusterStateResponse> listener) throws IOException {
91+
92+
assert task instanceof CancellableTask : task + " not cancellable";
93+
final CancellableTask cancellableTask = (CancellableTask) task;
8194

8295
final Predicate<ClusterState> acceptableClusterStatePredicate
8396
= request.waitForMetadataVersion() == null ? clusterState -> true
8497
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();
8598

86-
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
99+
final Predicate<ClusterState> acceptableClusterStateOrFailedPredicate = request.local()
87100
? acceptableClusterStatePredicate
88-
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
101+
: acceptableClusterStatePredicate.or(clusterState ->
102+
cancellableTask.isCancelled() || clusterState.nodes().isLocalNodeElectedMaster() == false);
89103

90104
if (acceptableClusterStatePredicate.test(state)) {
91105
ActionListener.completeWith(listener, () -> buildResponse(request, state));
92106
} else {
93-
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
107+
assert acceptableClusterStateOrFailedPredicate.test(state) == false;
94108
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
95109
.waitForNextChange(new ClusterStateObserver.Listener() {
96110

97111
@Override
98112
public void onNewClusterState(ClusterState newState) {
99-
if (acceptableClusterStatePredicate.test(newState)) {
113+
if (cancellableTask.isCancelled()) {
114+
listener.onFailure(new TaskCancelledException("task cancelled"));
115+
} else if (acceptableClusterStatePredicate.test(newState)) {
100116
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
101117
} else {
102118
listener.onFailure(new NotMasterException(
@@ -112,12 +128,16 @@ public void onClusterServiceClose() {
112128
@Override
113129
public void onTimeout(TimeValue timeout) {
114130
try {
115-
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
131+
if (cancellableTask.isCancelled()) {
132+
listener.onFailure(new TaskCancelledException("task cancelled"));
133+
} else {
134+
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
135+
}
116136
} catch (Exception e) {
117137
listener.onFailure(e);
118138
}
119139
}
120-
}, acceptableClusterStateOrNotMasterPredicate);
140+
}, acceptableClusterStateOrFailedPredicate);
121141
}
122142
}
123143

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStateAction.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchTimeoutException;
2323
import org.elasticsearch.action.ActionRunnable;
24+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
2425
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2526
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
2627
import org.elasticsearch.action.support.IndicesOptions;
@@ -40,6 +41,8 @@
4041
import org.elasticsearch.rest.RestStatus;
4142
import org.elasticsearch.rest.action.RestActionListener;
4243
import org.elasticsearch.rest.action.RestBuilderListener;
44+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
45+
import org.elasticsearch.tasks.TaskCancelledException;
4346
import org.elasticsearch.threadpool.ThreadPool;
4447

4548
import java.io.IOException;
@@ -116,10 +119,18 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
116119
}
117120
settingsFilter.addFilterSettingParams(request);
118121

119-
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
122+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
123+
.execute(ClusterStateAction.INSTANCE, clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
124+
125+
private void ensureOpen() {
126+
if (request.getHttpChannel().isOpen() == false) {
127+
throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed");
128+
}
129+
}
120130

121131
@Override
122132
protected void processResponse(ClusterStateResponse response) {
133+
ensureOpen();
123134
final long startTimeMs = threadPool.relativeTimeInMillis();
124135
// Process serialization on MANAGEMENT pool since the serialization of the cluster state to XContent
125136
// can be too slow to execute on an IO thread
@@ -128,6 +139,7 @@ protected void processResponse(ClusterStateResponse response) {
128139
@Override
129140
public RestResponse buildResponse(final ClusterStateResponse response,
130141
final XContentBuilder builder) throws Exception {
142+
ensureOpen();
131143
if (clusterStateRequest.local() == false &&
132144
threadPool.relativeTimeInMillis() - startTimeMs >
133145
clusterStateRequest.masterNodeTimeout().millis()) {
@@ -140,13 +152,16 @@ public RestResponse buildResponse(final ClusterStateResponse response,
140152
builder.field(Fields.CLUSTER_NAME, response.getClusterName().value());
141153
ToXContent.Params params = new ToXContent.DelegatingMapParams(
142154
singletonMap(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API), request);
143-
response.getState().toXContent(builder, params);
155+
final ClusterState responseState = response.getState();
156+
if (responseState != null) {
157+
responseState.toXContent(builder, params);
158+
}
144159
builder.endObject();
145160
return new BytesRestResponse(RestStatus.OK, builder);
146161
}
147162
}.onResponse(response)));
148163
}
149-
});
164+
});
150165
}
151166

152167
private static final Set<String> RESPONSE_PARAMS;

server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java

+16
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.test.ESTestCase;
2828
import org.elasticsearch.test.VersionUtils;
2929

30+
import static org.hamcrest.CoreMatchers.containsString;
3031
import static org.hamcrest.CoreMatchers.equalTo;
3132

3233
/**
@@ -87,4 +88,19 @@ private static void assertOptionsMatch(IndicesOptions in, IndicesOptions out) {
8788
assertThat(in.expandWildcardsOpen(), equalTo(out.expandWildcardsOpen()));
8889
assertThat(in.allowNoIndices(), equalTo(out.allowNoIndices()));
8990
}
91+
92+
public void testDescription() {
93+
assertThat(new ClusterStateRequest().clear().getDescription(), equalTo("cluster state [master timeout [30s]]"));
94+
assertThat(new ClusterStateRequest().masterNodeTimeout("5m").getDescription(),
95+
equalTo("cluster state [routing table, nodes, metadata, blocks, customs, master timeout [5m]]"));
96+
assertThat(new ClusterStateRequest().clear().routingTable(true).getDescription(), containsString("routing table"));
97+
assertThat(new ClusterStateRequest().clear().nodes(true).getDescription(), containsString("nodes"));
98+
assertThat(new ClusterStateRequest().clear().metadata(true).getDescription(), containsString("metadata"));
99+
assertThat(new ClusterStateRequest().clear().blocks(true).getDescription(), containsString("blocks"));
100+
assertThat(new ClusterStateRequest().clear().customs(true).getDescription(), containsString("customs"));
101+
assertThat(new ClusterStateRequest().local(true).getDescription(), containsString("local"));
102+
assertThat(new ClusterStateRequest().waitForMetadataVersion(23L).getDescription(),
103+
containsString("wait for metadata version [23] with timeout [1m]"));
104+
assertThat(new ClusterStateRequest().indices("foo", "bar").getDescription(), containsString("indices [foo, bar]"));
105+
}
90106
}

0 commit comments

Comments
 (0)