Skip to content

Commit a4fa12a

Browse files
committed
Always compress cluster state on transport layer
We send cluster states over the wire in response to cluster state requests and when validating join requests. Today there's no special treatment of these messages so we just materialize the whole cluster state into a sequence of buffers which can be tens or hundreds of MBs in a big cluster, and it's quite possible that a really big cluster would exceed the 2GiB limit on message size. This commit forces transport compression on these messages even if it's usually disabled, which means that the materialized cluster state is always compressed. It tends to compress well, so this gives us a bit more headroom in terms of maximum cluster size. Relates #77466 Closes #79906
1 parent 8887cfa commit a4fa12a

File tree

6 files changed

+31
-5
lines changed

6 files changed

+31
-5
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
import org.elasticsearch.tasks.CancellableTask;
2020
import org.elasticsearch.tasks.Task;
2121
import org.elasticsearch.tasks.TaskId;
22+
import org.elasticsearch.transport.AlwaysCompressedRequest;
2223

2324
import java.io.IOException;
2425
import java.util.Arrays;
2526
import java.util.Map;
2627

27-
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
28+
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest>
29+
implements
30+
IndicesRequest.Replaceable,
31+
AlwaysCompressedRequest {
2832

2933
public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1);
3034

server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
import org.elasticsearch.cluster.ClusterState;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.transport.AlwaysCompressedRequest;
1314
import org.elasticsearch.transport.TransportRequest;
1415

1516
import java.io.IOException;
1617

17-
public class ValidateJoinRequest extends TransportRequest {
18-
private ClusterState state;
18+
public class ValidateJoinRequest extends TransportRequest implements AlwaysCompressedRequest {
19+
private final ClusterState state;
1920

2021
public ValidateJoinRequest(StreamInput in) throws IOException {
2122
super(in);
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.transport;
10+
11+
/**
12+
* A marker interface for requests that should always be compressed because they (or their responses) might get overwhelmingly large
13+
* otherwise.
14+
*/
15+
public interface AlwaysCompressedRequest {}

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
1414
import org.apache.lucene.util.BytesRef;
1515
import org.elasticsearch.Version;
16+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1617
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
1718
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
1819
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -131,7 +132,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
131132
if (header.isError()) {
132133
handlerResponseError(streamInput, handler);
133134
} else {
134-
handleResponse(remoteAddress, streamInput, handler);
135+
handleResponse(remoteAddress, message.getHeader(), streamInput, handler);
135136
}
136137
// Check the entire message has been read
137138
final int nextByte = streamInput.read();
@@ -149,7 +150,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
149150
}
150151
} else {
151152
assert header.isError() == false;
152-
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
153+
handleResponse(remoteAddress, message.getHeader(), EMPTY_STREAM_INPUT, handler);
153154
}
154155
}
155156
}
@@ -301,6 +302,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp
301302

302303
private <T extends TransportResponse> void handleResponse(
303304
InetSocketAddress remoteAddress,
305+
final Header header,
304306
final StreamInput stream,
305307
final TransportResponseHandler<T> handler
306308
) {
@@ -317,6 +319,7 @@ private <T extends TransportResponse> void handleResponse(
317319
handleException(handler, serializationException);
318320
return;
319321
}
322+
assert header.isCompressed() || (response instanceof ClusterStateResponse == false);
320323
final String executor = handler.executor();
321324
if (ThreadPool.Names.SAME.equals(executor)) {
322325
doHandleResponse(handler, response);

server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.transport;
99

1010
import org.elasticsearch.Version;
11+
import org.elasticsearch.cluster.coordination.ValidateJoinRequest;
1112
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.common.bytes.BytesArray;
1314
import org.elasticsearch.common.bytes.BytesReference;
@@ -36,6 +37,7 @@ abstract class OutboundMessage extends NetworkMessage {
3637
) {
3738
super(threadContext, version, status, requestId, compressionScheme);
3839
this.message = message;
40+
assert isCompress() || (message instanceof ValidateJoinRequest == false);
3941
}
4042

4143
BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
266266
// is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be
267267
// compressed.
268268
final boolean shouldCompress = compress == Compression.Enabled.TRUE
269+
|| (request instanceof AlwaysCompressedRequest)
269270
|| (compress == Compression.Enabled.INDEXING_DATA
270271
&& request instanceof RawIndexingDataTransportRequest
271272
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());

0 commit comments

Comments
 (0)