Skip to content

Commit 46fa3dd

Browse files
committed
Revert "Skip cluster state serialization to closed channel (#67413)"
This reverts commit 563d07f.
1 parent 563d07f commit 46fa3dd

File tree

7 files changed

+11
-268
lines changed

7 files changed

+11
-268
lines changed

qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java

-153
This file was deleted.

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.node.DiscoveryNode;
4242
import org.elasticsearch.common.Priority;
4343
import org.elasticsearch.common.Strings;
44-
import org.elasticsearch.common.network.CloseableChannel;
4544
import org.elasticsearch.common.settings.Settings;
4645
import org.elasticsearch.common.settings.SettingsFilter;
4746
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -458,8 +457,6 @@ public void sendResponse(RestResponse response) {
458457
assertThat(response.content().utf8ToString(), not(containsString("verysecretpassword")));
459458
} catch (AssertionError ex) {
460459
clusterStateError.set(ex);
461-
} finally {
462-
CloseableChannel.closeChannel(clusterStateRequest.getHttpChannel());
463460
}
464461
clusterStateLatch.countDown();
465462
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

-48
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@
2828
import org.elasticsearch.common.io.stream.StreamInput;
2929
import org.elasticsearch.common.io.stream.StreamOutput;
3030
import org.elasticsearch.common.unit.TimeValue;
31-
import org.elasticsearch.tasks.CancellableTask;
32-
import org.elasticsearch.tasks.Task;
33-
import org.elasticsearch.tasks.TaskId;
3431

3532
import java.io.IOException;
36-
import java.util.Arrays;
37-
import java.util.Map;
3833

3934
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
4035

@@ -201,47 +196,4 @@ public ClusterStateRequest waitForMetadataVersion(long waitForMetadataVersion) {
201196
this.waitForMetadataVersion = waitForMetadataVersion;
202197
return this;
203198
}
204-
205-
@Override
206-
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
207-
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
208-
@Override
209-
public boolean shouldCancelChildrenOnCancellation() {
210-
return true;
211-
}
212-
};
213-
}
214-
215-
@Override
216-
public String getDescription() {
217-
final StringBuilder stringBuilder = new StringBuilder("cluster state [");
218-
if (routingTable) {
219-
stringBuilder.append("routing table, ");
220-
}
221-
if (nodes) {
222-
stringBuilder.append("nodes, ");
223-
}
224-
if (metadata) {
225-
stringBuilder.append("metadata, ");
226-
}
227-
if (blocks) {
228-
stringBuilder.append("blocks, ");
229-
}
230-
if (customs) {
231-
stringBuilder.append("customs, ");
232-
}
233-
if (local) {
234-
stringBuilder.append("local, ");
235-
}
236-
if (waitForMetadataVersion != null) {
237-
stringBuilder.append("wait for metadata version [").append(waitForMetadataVersion)
238-
.append("] with timeout [").append(waitForTimeout).append("], ");
239-
}
240-
if (indices.length > 0) {
241-
stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
242-
}
243-
stringBuilder.append("master timeout [").append(masterNodeTimeout).append("]]");
244-
return stringBuilder.toString();
245-
}
246-
247199
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

+6-26
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@
4141
import org.elasticsearch.common.unit.TimeValue;
4242
import org.elasticsearch.index.Index;
4343
import org.elasticsearch.node.NodeClosedException;
44-
import org.elasticsearch.tasks.CancellableTask;
45-
import org.elasticsearch.tasks.Task;
46-
import org.elasticsearch.tasks.TaskCancelledException;
4744
import org.elasticsearch.threadpool.ThreadPool;
4845
import org.elasticsearch.transport.TransportService;
4946

@@ -81,38 +78,25 @@ protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterS
8178
@Override
8279
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
8380
final ActionListener<ClusterStateResponse> listener) throws IOException {
84-
assert false : "task is required";
85-
throw new UnsupportedOperationException("task is required");
86-
}
87-
88-
@Override
89-
protected void masterOperation(final Task task, final ClusterStateRequest request, final ClusterState state,
90-
final ActionListener<ClusterStateResponse> listener) throws IOException {
91-
92-
assert task instanceof CancellableTask : task + " not cancellable";
93-
final CancellableTask cancellableTask = (CancellableTask) task;
9481

9582
final Predicate<ClusterState> acceptableClusterStatePredicate
9683
= request.waitForMetadataVersion() == null ? clusterState -> true
9784
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();
9885

99-
final Predicate<ClusterState> acceptableClusterStateOrFailedPredicate = request.local()
86+
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
10087
? acceptableClusterStatePredicate
101-
: acceptableClusterStatePredicate.or(clusterState ->
102-
cancellableTask.isCancelled() || clusterState.nodes().isLocalNodeElectedMaster() == false);
88+
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
10389

10490
if (acceptableClusterStatePredicate.test(state)) {
10591
ActionListener.completeWith(listener, () -> buildResponse(request, state));
10692
} else {
107-
assert acceptableClusterStateOrFailedPredicate.test(state) == false;
93+
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
10894
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
10995
.waitForNextChange(new ClusterStateObserver.Listener() {
11096

11197
@Override
11298
public void onNewClusterState(ClusterState newState) {
113-
if (cancellableTask.isCancelled()) {
114-
listener.onFailure(new TaskCancelledException("task cancelled"));
115-
} else if (acceptableClusterStatePredicate.test(newState)) {
99+
if (acceptableClusterStatePredicate.test(newState)) {
116100
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
117101
} else {
118102
listener.onFailure(new NotMasterException(
@@ -128,16 +112,12 @@ public void onClusterServiceClose() {
128112
@Override
129113
public void onTimeout(TimeValue timeout) {
130114
try {
131-
if (cancellableTask.isCancelled()) {
132-
listener.onFailure(new TaskCancelledException("task cancelled"));
133-
} else {
134-
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
135-
}
115+
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
136116
} catch (Exception e) {
137117
listener.onFailure(e);
138118
}
139119
}
140-
}, acceptableClusterStateOrFailedPredicate);
120+
}, acceptableClusterStateOrNotMasterPredicate);
141121
}
142122
}
143123

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStateAction.java

+3-18
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.elasticsearch.ElasticsearchTimeoutException;
2323
import org.elasticsearch.action.ActionRunnable;
24-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
2524
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2625
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
2726
import org.elasticsearch.action.support.IndicesOptions;
@@ -41,8 +40,6 @@
4140
import org.elasticsearch.rest.RestStatus;
4241
import org.elasticsearch.rest.action.RestActionListener;
4342
import org.elasticsearch.rest.action.RestBuilderListener;
44-
import org.elasticsearch.rest.action.RestCancellableNodeClient;
45-
import org.elasticsearch.tasks.TaskCancelledException;
4643
import org.elasticsearch.threadpool.ThreadPool;
4744

4845
import java.io.IOException;
@@ -119,18 +116,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
119116
}
120117
settingsFilter.addFilterSettingParams(request);
121118

122-
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
123-
.execute(ClusterStateAction.INSTANCE, clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
124-
125-
private void ensureOpen() {
126-
if (request.getHttpChannel().isOpen() == false) {
127-
throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed");
128-
}
129-
}
119+
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
130120

131121
@Override
132122
protected void processResponse(ClusterStateResponse response) {
133-
ensureOpen();
134123
final long startTimeMs = threadPool.relativeTimeInMillis();
135124
// Process serialization on MANAGEMENT pool since the serialization of the cluster state to XContent
136125
// can be too slow to execute on an IO thread
@@ -139,7 +128,6 @@ protected void processResponse(ClusterStateResponse response) {
139128
@Override
140129
public RestResponse buildResponse(final ClusterStateResponse response,
141130
final XContentBuilder builder) throws Exception {
142-
ensureOpen();
143131
if (clusterStateRequest.local() == false &&
144132
threadPool.relativeTimeInMillis() - startTimeMs >
145133
clusterStateRequest.masterNodeTimeout().millis()) {
@@ -152,16 +140,13 @@ public RestResponse buildResponse(final ClusterStateResponse response,
152140
builder.field(Fields.CLUSTER_NAME, response.getClusterName().value());
153141
ToXContent.Params params = new ToXContent.DelegatingMapParams(
154142
singletonMap(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API), request);
155-
final ClusterState responseState = response.getState();
156-
if (responseState != null) {
157-
responseState.toXContent(builder, params);
158-
}
143+
response.getState().toXContent(builder, params);
159144
builder.endObject();
160145
return new BytesRestResponse(RestStatus.OK, builder);
161146
}
162147
}.onResponse(response)));
163148
}
164-
});
149+
});
165150
}
166151

167152
private static final Set<String> RESPONSE_PARAMS;

server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java

-16
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.test.ESTestCase;
2828
import org.elasticsearch.test.VersionUtils;
2929

30-
import static org.hamcrest.CoreMatchers.containsString;
3130
import static org.hamcrest.CoreMatchers.equalTo;
3231

3332
/**
@@ -88,19 +87,4 @@ private static void assertOptionsMatch(IndicesOptions in, IndicesOptions out) {
8887
assertThat(in.expandWildcardsOpen(), equalTo(out.expandWildcardsOpen()));
8988
assertThat(in.allowNoIndices(), equalTo(out.allowNoIndices()));
9089
}
91-
92-
public void testDescription() {
93-
assertThat(new ClusterStateRequest().clear().getDescription(), equalTo("cluster state [master timeout [30s]]"));
94-
assertThat(new ClusterStateRequest().masterNodeTimeout("5m").getDescription(),
95-
equalTo("cluster state [routing table, nodes, metadata, blocks, customs, master timeout [5m]]"));
96-
assertThat(new ClusterStateRequest().clear().routingTable(true).getDescription(), containsString("routing table"));
97-
assertThat(new ClusterStateRequest().clear().nodes(true).getDescription(), containsString("nodes"));
98-
assertThat(new ClusterStateRequest().clear().metadata(true).getDescription(), containsString("metadata"));
99-
assertThat(new ClusterStateRequest().clear().blocks(true).getDescription(), containsString("blocks"));
100-
assertThat(new ClusterStateRequest().clear().customs(true).getDescription(), containsString("customs"));
101-
assertThat(new ClusterStateRequest().local(true).getDescription(), containsString("local"));
102-
assertThat(new ClusterStateRequest().waitForMetadataVersion(23L).getDescription(),
103-
containsString("wait for metadata version [23] with timeout [1m]"));
104-
assertThat(new ClusterStateRequest().indices("foo", "bar").getDescription(), containsString("indices [foo, bar]"));
105-
}
10690
}

0 commit comments

Comments
 (0)