Skip to content

Commit 31cf058

Browse files
authored
Skip cluster state serialization to closed channel (#67413)
Today if a client requests a cluster state and then closes the connection then we still do all the work of computing and serializing the cluster state before finally dropping it all on the floor. With this commit we introduce checks to make sure that the HTTP channel is still open before starting the serialization process. We also make the tasks themselves cancellable and abort any ongoing waiting if the channel is closed (mainly to make the cancellability testing easier). Finally we introduce a more detailed description of the task to help identify cases where clients are inefficiently requesting more components of the cluster state than they need.
1 parent e2ee9bc commit 31cf058

File tree

7 files changed

+260
-11
lines changed

7 files changed

+260
-11
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.http;
21+
22+
import org.apache.http.client.methods.HttpGet;
23+
import org.elasticsearch.Version;
24+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
25+
import org.elasticsearch.action.support.PlainActionFuture;
26+
import org.elasticsearch.client.Cancellable;
27+
import org.elasticsearch.client.Request;
28+
import org.elasticsearch.client.Response;
29+
import org.elasticsearch.client.ResponseListener;
30+
import org.elasticsearch.cluster.AbstractDiffable;
31+
import org.elasticsearch.cluster.ClusterState;
32+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
33+
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
35+
import org.elasticsearch.common.io.stream.StreamOutput;
36+
import org.elasticsearch.common.util.CollectionUtils;
37+
import org.elasticsearch.common.xcontent.XContentBuilder;
38+
import org.elasticsearch.plugins.Plugin;
39+
import org.elasticsearch.tasks.TaskInfo;
40+
41+
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.List;
44+
import java.util.concurrent.CancellationException;
45+
import java.util.function.UnaryOperator;
46+
47+
public class ClusterStateRestCancellationIT extends HttpSmokeTestCase {
48+
49+
@Override
50+
protected Collection<Class<? extends Plugin>> nodePlugins() {
51+
return CollectionUtils.appendToCopy(super.nodePlugins(), AssertingCustomPlugin.class);
52+
}
53+
54+
private void updateClusterState(ClusterService clusterService, UnaryOperator<ClusterState> updateOperator) {
55+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
56+
clusterService.submitStateUpdateTask("update state", new ClusterStateUpdateTask() {
57+
@Override
58+
public ClusterState execute(ClusterState currentState) {
59+
return updateOperator.apply(currentState);
60+
}
61+
62+
@Override
63+
public void onFailure(String source, Exception e) {
64+
throw new AssertionError(source, e);
65+
}
66+
67+
@Override
68+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
69+
future.onResponse(null);
70+
}
71+
});
72+
future.actionGet();
73+
}
74+
75+
public void testClusterStateRestCancellation() throws Exception {
76+
77+
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
78+
updateClusterState(clusterService, s -> ClusterState.builder(s).putCustom(AssertingCustom.NAME, AssertingCustom.INSTANCE).build());
79+
80+
final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state");
81+
clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE));
82+
clusterStateRequest.addParameter("wait_for_timeout", "1h");
83+
84+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
85+
logger.info("--> sending cluster state request");
86+
final Cancellable cancellable = getRestClient().performRequestAsync(clusterStateRequest, new ResponseListener() {
87+
@Override
88+
public void onSuccess(Response response) {
89+
future.onResponse(null);
90+
}
91+
92+
@Override
93+
public void onFailure(Exception exception) {
94+
future.onFailure(exception);
95+
}
96+
});
97+
98+
logger.info("--> waiting for task to start");
99+
assertBusy(() -> {
100+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
101+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
102+
});
103+
104+
logger.info("--> cancelling cluster state request");
105+
cancellable.cancel();
106+
expectThrows(CancellationException.class, future::actionGet);
107+
108+
logger.info("--> checking cluster state task completed");
109+
assertBusy(() -> {
110+
updateClusterState(clusterService, s -> ClusterState.builder(s).build());
111+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
112+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
113+
});
114+
115+
updateClusterState(clusterService, s -> ClusterState.builder(s).removeCustom(AssertingCustom.NAME).build());
116+
}
117+
118+
private static class AssertingCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
119+
120+
static final String NAME = "asserting";
121+
static final AssertingCustom INSTANCE = new AssertingCustom();
122+
123+
@Override
124+
public String getWriteableName() {
125+
return NAME;
126+
}
127+
128+
@Override
129+
public Version getMinimalSupportedVersion() {
130+
return Version.CURRENT;
131+
}
132+
133+
@Override
134+
public void writeTo(StreamOutput out) {
135+
// no content
136+
}
137+
138+
@Override
139+
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
140+
throw new AssertionError("task should have been cancelled before serializing this custom");
141+
}
142+
}
143+
144+
public static class AssertingCustomPlugin extends Plugin {
145+
@Override
146+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
147+
return Collections.singletonList(
148+
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, AssertingCustom.NAME, in -> AssertingCustom.INSTANCE));
149+
}
150+
}
151+
152+
153+
}

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.cluster.node.DiscoveryNode;
4040
import org.elasticsearch.common.Priority;
4141
import org.elasticsearch.common.Strings;
42+
import org.elasticsearch.common.network.CloseableChannel;
4243
import org.elasticsearch.common.settings.Settings;
4344
import org.elasticsearch.common.settings.SettingsFilter;
4445
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -454,6 +455,8 @@ public void sendResponse(RestResponse response) {
454455
assertThat(response.content().utf8ToString(), not(containsString("verysecretpassword")));
455456
} catch (AssertionError ex) {
456457
clusterStateError.set(ex);
458+
} finally {
459+
CloseableChannel.closeChannel(clusterStateRequest.getHttpChannel());
457460
}
458461
clusterStateLatch.countDown();
459462
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
2929
import org.elasticsearch.common.unit.TimeValue;
30+
import org.elasticsearch.tasks.CancellableTask;
31+
import org.elasticsearch.tasks.Task;
32+
import org.elasticsearch.tasks.TaskId;
3033

3134
import java.io.IOException;
35+
import java.util.Arrays;
36+
import java.util.Map;
3237

3338
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
3439

@@ -191,4 +196,47 @@ public ClusterStateRequest waitForMetadataVersion(long waitForMetadataVersion) {
191196
this.waitForMetadataVersion = waitForMetadataVersion;
192197
return this;
193198
}
199+
200+
@Override
201+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
202+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
203+
@Override
204+
public boolean shouldCancelChildrenOnCancellation() {
205+
return true;
206+
}
207+
};
208+
}
209+
210+
@Override
211+
public String getDescription() {
212+
final StringBuilder stringBuilder = new StringBuilder("cluster state [");
213+
if (routingTable) {
214+
stringBuilder.append("routing table, ");
215+
}
216+
if (nodes) {
217+
stringBuilder.append("nodes, ");
218+
}
219+
if (metadata) {
220+
stringBuilder.append("metadata, ");
221+
}
222+
if (blocks) {
223+
stringBuilder.append("blocks, ");
224+
}
225+
if (customs) {
226+
stringBuilder.append("customs, ");
227+
}
228+
if (local) {
229+
stringBuilder.append("local, ");
230+
}
231+
if (waitForMetadataVersion != null) {
232+
stringBuilder.append("wait for metadata version [").append(waitForMetadataVersion)
233+
.append("] with timeout [").append(waitForTimeout).append("], ");
234+
}
235+
if (indices.length > 0) {
236+
stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
237+
}
238+
stringBuilder.append("master timeout [").append(masterNodeTimeout).append("]]");
239+
return stringBuilder.toString();
240+
}
241+
194242
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import org.elasticsearch.common.unit.TimeValue;
4242
import org.elasticsearch.index.Index;
4343
import org.elasticsearch.node.NodeClosedException;
44+
import org.elasticsearch.tasks.CancellableTask;
4445
import org.elasticsearch.tasks.Task;
46+
import org.elasticsearch.tasks.TaskCancelledException;
4547
import org.elasticsearch.threadpool.ThreadPool;
4648
import org.elasticsearch.transport.TransportService;
4749

@@ -73,24 +75,30 @@ protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterS
7375
protected void masterOperation(Task task, final ClusterStateRequest request, final ClusterState state,
7476
final ActionListener<ClusterStateResponse> listener) throws IOException {
7577

78+
assert task instanceof CancellableTask : task + " not cancellable";
79+
final CancellableTask cancellableTask = (CancellableTask) task;
80+
7681
final Predicate<ClusterState> acceptableClusterStatePredicate
7782
= request.waitForMetadataVersion() == null ? clusterState -> true
7883
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();
7984

80-
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
85+
final Predicate<ClusterState> acceptableClusterStateOrFailedPredicate = request.local()
8186
? acceptableClusterStatePredicate
82-
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
87+
: acceptableClusterStatePredicate.or(clusterState ->
88+
cancellableTask.isCancelled() || clusterState.nodes().isLocalNodeElectedMaster() == false);
8389

8490
if (acceptableClusterStatePredicate.test(state)) {
8591
ActionListener.completeWith(listener, () -> buildResponse(request, state));
8692
} else {
87-
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
93+
assert acceptableClusterStateOrFailedPredicate.test(state) == false;
8894
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
8995
.waitForNextChange(new ClusterStateObserver.Listener() {
9096

9197
@Override
9298
public void onNewClusterState(ClusterState newState) {
93-
if (acceptableClusterStatePredicate.test(newState)) {
99+
if (cancellableTask.isCancelled()) {
100+
listener.onFailure(new TaskCancelledException("task cancelled"));
101+
} else if (acceptableClusterStatePredicate.test(newState)) {
94102
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
95103
} else {
96104
listener.onFailure(new NotMasterException(
@@ -106,12 +114,16 @@ public void onClusterServiceClose() {
106114
@Override
107115
public void onTimeout(TimeValue timeout) {
108116
try {
109-
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
117+
if (cancellableTask.isCancelled()) {
118+
listener.onFailure(new TaskCancelledException("task cancelled"));
119+
} else {
120+
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
121+
}
110122
} catch (Exception e) {
111123
listener.onFailure(e);
112124
}
113125
}
114-
}, acceptableClusterStateOrNotMasterPredicate);
126+
}, acceptableClusterStateOrFailedPredicate);
115127
}
116128
}
117129

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStateAction.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchTimeoutException;
2323
import org.elasticsearch.action.ActionRunnable;
24+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
2425
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2526
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
2627
import org.elasticsearch.action.support.IndicesOptions;
@@ -40,6 +41,8 @@
4041
import org.elasticsearch.rest.RestStatus;
4142
import org.elasticsearch.rest.action.RestActionListener;
4243
import org.elasticsearch.rest.action.RestBuilderListener;
44+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
45+
import org.elasticsearch.tasks.TaskCancelledException;
4346
import org.elasticsearch.threadpool.ThreadPool;
4447

4548
import java.io.IOException;
@@ -114,10 +117,18 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
114117
}
115118
settingsFilter.addFilterSettingParams(request);
116119

117-
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) {
120+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
121+
.execute(ClusterStateAction.INSTANCE, clusterStateRequest, new RestActionListener<>(channel) {
122+
123+
private void ensureOpen() {
124+
if (request.getHttpChannel().isOpen() == false) {
125+
throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed");
126+
}
127+
}
118128

119129
@Override
120130
protected void processResponse(ClusterStateResponse response) {
131+
ensureOpen();
121132
final long startTimeMs = threadPool.relativeTimeInMillis();
122133
// Process serialization on MANAGEMENT pool since the serialization of the cluster state to XContent
123134
// can be too slow to execute on an IO thread
@@ -126,6 +137,7 @@ protected void processResponse(ClusterStateResponse response) {
126137
@Override
127138
public RestResponse buildResponse(final ClusterStateResponse response,
128139
final XContentBuilder builder) throws Exception {
140+
ensureOpen();
129141
if (clusterStateRequest.local() == false &&
130142
threadPool.relativeTimeInMillis() - startTimeMs >
131143
clusterStateRequest.masterNodeTimeout().millis()) {
@@ -138,13 +150,16 @@ public RestResponse buildResponse(final ClusterStateResponse response,
138150
builder.field(Fields.CLUSTER_NAME, response.getClusterName().value());
139151
ToXContent.Params params = new ToXContent.DelegatingMapParams(
140152
singletonMap(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API), request);
141-
response.getState().toXContent(builder, params);
153+
final ClusterState responseState = response.getState();
154+
if (responseState != null) {
155+
responseState.toXContent(builder, params);
156+
}
142157
builder.endObject();
143158
return new BytesRestResponse(RestStatus.OK, builder);
144159
}
145160
}.onResponse(response)));
146161
}
147-
});
162+
});
148163
}
149164

150165
private static final Set<String> RESPONSE_PARAMS;

server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.test.ESTestCase;
2828
import org.elasticsearch.test.VersionUtils;
2929

30+
import static org.hamcrest.CoreMatchers.containsString;
3031
import static org.hamcrest.CoreMatchers.equalTo;
3132

3233
/**
@@ -85,4 +86,19 @@ private static void assertOptionsMatch(IndicesOptions in, IndicesOptions out) {
8586
assertThat(in.expandWildcardsOpen(), equalTo(out.expandWildcardsOpen()));
8687
assertThat(in.allowNoIndices(), equalTo(out.allowNoIndices()));
8788
}
89+
90+
public void testDescription() {
91+
assertThat(new ClusterStateRequest().clear().getDescription(), equalTo("cluster state [master timeout [30s]]"));
92+
assertThat(new ClusterStateRequest().masterNodeTimeout("5m").getDescription(),
93+
equalTo("cluster state [routing table, nodes, metadata, blocks, customs, master timeout [5m]]"));
94+
assertThat(new ClusterStateRequest().clear().routingTable(true).getDescription(), containsString("routing table"));
95+
assertThat(new ClusterStateRequest().clear().nodes(true).getDescription(), containsString("nodes"));
96+
assertThat(new ClusterStateRequest().clear().metadata(true).getDescription(), containsString("metadata"));
97+
assertThat(new ClusterStateRequest().clear().blocks(true).getDescription(), containsString("blocks"));
98+
assertThat(new ClusterStateRequest().clear().customs(true).getDescription(), containsString("customs"));
99+
assertThat(new ClusterStateRequest().local(true).getDescription(), containsString("local"));
100+
assertThat(new ClusterStateRequest().waitForMetadataVersion(23L).getDescription(),
101+
containsString("wait for metadata version [23] with timeout [1m]"));
102+
assertThat(new ClusterStateRequest().indices("foo", "bar").getDescription(), containsString("indices [foo, bar]"));
103+
}
88104
}

0 commit comments

Comments
 (0)